1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-09-16 09:26:25 +02:00

New api/label package, common label set impl (#651)

* New label set API

* Checkpoint

* Remove label.Labels interface

* Fix trace

* Remove label storage

* Restore metric_test.go

* Tidy tests

* More comments

* More comments

* Same changes as 654

* Checkpoint

* Fix batch labels

* Avoid Resource.Attributes() where possible

* Update comments and restore order in resource.go

* From feedback

* From feedback

* Move iterator_test & feedback

* Strenghten the label.Set test

* Feedback on typos

* Fix the set test per @krnowak

* Nit
This commit is contained in:
Joshua MacDonald
2020-04-23 12:10:58 -07:00
committed by GitHub
parent acb350b8f3
commit 0bb12d9b1b
44 changed files with 1160 additions and 943 deletions

152
api/label/encoder.go Normal file
View File

@@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package label
import (
"bytes"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/api/core"
)
type (
// Encoder is a mechanism for serializing a label set into a
// specific string representation that supports caching, to
// avoid repeated serialization. An example could be an
// exporter encoding the label set into a wire representation.
Encoder interface {
// Encode returns the serialized encoding of the label
// set using its Iterator. This result may be cached
// by a label.Set.
Encode(Iterator) string
// ID returns a value that is unique for each class of
// label encoder. Label encoders allocate these using
// `NewEncoderID`.
ID() EncoderID
}
// EncoderID is used to identify distinct Encoder
// implementations, for caching encoded results.
EncoderID struct {
value uint64
}
// defaultLabelEncoder uses a sync.Pool of buffers to reduce
// the number of allocations used in encoding labels. This
// implementation encodes a comma-separated list of key=value,
// with '/'-escaping of '=', ',', and '\'.
defaultLabelEncoder struct {
// pool is a pool of labelset builders. The buffers in this
// pool grow to a size that most label encodings will not
// allocate new memory.
pool sync.Pool // *bytes.Buffer
}
)
// escapeChar is used to ensure uniqueness of the label encoding where
// keys or values contain either '=' or ','. Since there is no parser
// needed for this encoding and its only requirement is to be unique,
// this choice is arbitrary. Users will see these in some exporters
// (e.g., stdout), so the backslash ('\') is used as a conventional choice.
const escapeChar = '\\'
var (
_ Encoder = &defaultLabelEncoder{}
// encoderIDCounter is for generating IDs for other label
// encoders.
encoderIDCounter uint64
defaultEncoderOnce sync.Once
defaultEncoderID = NewEncoderID()
defaultEncoderInstance *defaultLabelEncoder
)
// NewEncoderID returns a unique label encoder ID. It should be
// called once per each type of label encoder. Preferably in init() or
// in var definition.
func NewEncoderID() EncoderID {
return EncoderID{value: atomic.AddUint64(&encoderIDCounter, 1)}
}
// DefaultEncoder returns a label encoder that encodes labels
// in such a way that each escaped label's key is followed by an equal
// sign and then by an escaped label's value. All key-value pairs are
// separated by a comma.
//
// Escaping is done by prepending a backslash before either a
// backslash, equal sign or a comma.
func DefaultEncoder() Encoder {
defaultEncoderOnce.Do(func() {
defaultEncoderInstance = &defaultLabelEncoder{
pool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
})
return defaultEncoderInstance
}
// Encode is a part of an implementation of the LabelEncoder
// interface.
func (d *defaultLabelEncoder) Encode(iter Iterator) string {
buf := d.pool.Get().(*bytes.Buffer)
defer d.pool.Put(buf)
buf.Reset()
for iter.Next() {
i, kv := iter.IndexedLabel()
if i > 0 {
_, _ = buf.WriteRune(',')
}
copyAndEscape(buf, string(kv.Key))
_, _ = buf.WriteRune('=')
if kv.Value.Type() == core.STRING {
copyAndEscape(buf, kv.Value.AsString())
} else {
_, _ = buf.WriteString(kv.Value.Emit())
}
}
return buf.String()
}
// ID is a part of an implementation of the LabelEncoder interface.
func (*defaultLabelEncoder) ID() EncoderID {
return defaultEncoderID
}
// copyAndEscape escapes `=`, `,` and its own escape character (`\`),
// making the default encoding unique.
func copyAndEscape(buf *bytes.Buffer, val string) {
for _, ch := range val {
switch ch {
case '=', ',', escapeChar:
buf.WriteRune(escapeChar)
}
buf.WriteRune(ch)
}
}
// Valid returns true if this encoder ID was allocated by
// `NewEncoderID`. Invalid encoder IDs will not be cached.
func (id EncoderID) Valid() bool {
return id.value != 0
}

77
api/label/iterator.go Normal file
View File

@@ -0,0 +1,77 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package label
import (
"go.opentelemetry.io/otel/api/core"
)
// Iterator allows iterating over the set of labels in order,
// sorted by key.
type Iterator struct {
storage *Set
idx int
}
// Next moves the iterator to the next position. Returns false if there
// are no more labels.
func (i *Iterator) Next() bool {
i.idx++
return i.idx < i.Len()
}
// Label returns current core.KeyValue. Must be called only after Next returns
// true.
func (i *Iterator) Label() core.KeyValue {
kv, _ := i.storage.Get(i.idx)
return kv
}
// Attribute is a synonym for Label().
func (i *Iterator) Attribute() core.KeyValue {
return i.Label()
}
// IndexedLabel returns current index and label. Must be called only
// after Next returns true.
func (i *Iterator) IndexedLabel() (int, core.KeyValue) {
return i.idx, i.Label()
}
// IndexedAttribute is a synonym for IndexedLabel().
func (i *Iterator) IndexedAttribute() (int, core.KeyValue) {
return i.IndexedLabel()
}
// Len returns a number of labels in the iterator's `*Set`.
func (i *Iterator) Len() int {
return i.storage.Len()
}
// ToSlice is a convenience function that creates a slice of labels
// from the passed iterator. The iterator is set up to start from the
// beginning before creating the slice.
func (i *Iterator) ToSlice() []core.KeyValue {
l := i.Len()
if l == 0 {
return nil
}
i.idx = -1
slice := make([]core.KeyValue, 0, l)
for i.Next() {
slice = append(slice, i.Label())
}
return slice
}

View File

@@ -12,33 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package resource
package label_test
import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
)
func TestAttributeIterator(t *testing.T) {
func TestIterator(t *testing.T) {
one := key.String("one", "1")
two := key.Int("two", 2)
iter := NewAttributeIterator([]core.KeyValue{one, two})
lbl := label.NewSet(one, two)
iter := lbl.Iter()
require.Equal(t, 2, iter.Len())
require.True(t, iter.Next())
require.Equal(t, one, iter.Attribute())
idx, attr := iter.IndexedAttribute()
require.Equal(t, one, iter.Label())
idx, attr := iter.IndexedLabel()
require.Equal(t, 0, idx)
require.Equal(t, one, attr)
require.Equal(t, 2, iter.Len())
require.True(t, iter.Next())
require.Equal(t, two, iter.Attribute())
idx, attr = iter.IndexedAttribute()
require.Equal(t, two, iter.Label())
idx, attr = iter.IndexedLabel()
require.Equal(t, 1, idx)
require.Equal(t, two, attr)
require.Equal(t, 2, iter.Len())
@@ -47,8 +48,9 @@ func TestAttributeIterator(t *testing.T) {
require.Equal(t, 2, iter.Len())
}
func TestEmptyAttributeIterator(t *testing.T) {
iter := NewAttributeIterator(nil)
func TestEmptyIterator(t *testing.T) {
lbl := label.NewSet()
iter := lbl.Iter()
require.Equal(t, 0, iter.Len())
require.False(t, iter.Next())
}

389
api/label/set.go Normal file
View File

@@ -0,0 +1,389 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package label // import "go.opentelemetry.io/otel/api/label"
import (
"encoding/json"
"reflect"
"sort"
"sync"
"go.opentelemetry.io/otel/api/core"
)
type (
// Set is the representation for a distinct label set. It
// manages an immutable set of labels, with an internal cache
// for storing label encodings.
//
// This type supports the `Equivalent` method of comparison
// using values of type `Distinct`.
//
// This type is used to implement:
// 1. Metric labels
// 2. Resource sets
// 3. Correlation map (TODO)
Set struct {
equivalent Distinct
lock sync.Mutex
encoders [maxConcurrentEncoders]EncoderID
encoded [maxConcurrentEncoders]string
}
// Distinct wraps a variable-size array of `core.KeyValue`,
// constructed with keys in sorted order. This can be used as
// a map key or for equality checking between Sets.
Distinct struct {
iface interface{}
}
// Sortable implements `sort.Interface`, used for sorting
// `core.KeyValue`. This is an exported type to support a
// memory optimization. A pointer to one of these is needed
// for the call to `sort.Stable()`, which the caller may
// provide in order to avoid an allocation. See
// `NewSetWithSortable()`.
Sortable []core.KeyValue
)
var (
// keyValueType is used in `computeDistinctReflect`.
keyValueType = reflect.TypeOf(core.KeyValue{})
// emptySet is returned for empty label sets.
emptySet = &Set{
equivalent: Distinct{
iface: [0]core.KeyValue{},
},
}
)
const maxConcurrentEncoders = 3
func EmptySet() *Set {
return emptySet
}
// reflect abbreviates `reflect.ValueOf`.
func (d Distinct) reflect() reflect.Value {
return reflect.ValueOf(d.iface)
}
// Valid returns true if this value refers to a valid `*Set`.
func (d Distinct) Valid() bool {
return d.iface != nil
}
// Len returns the number of labels in this set.
func (l *Set) Len() int {
if l == nil || !l.equivalent.Valid() {
return 0
}
return l.equivalent.reflect().Len()
}
// Get returns the KeyValue at ordered position `idx` in this set.
func (l *Set) Get(idx int) (core.KeyValue, bool) {
if l == nil {
return core.KeyValue{}, false
}
value := l.equivalent.reflect()
if idx >= 0 && idx < value.Len() {
// Note: The Go compiler successfully avoids an allocation for
// the interface{} conversion here:
return value.Index(idx).Interface().(core.KeyValue), true
}
return core.KeyValue{}, false
}
// Value returns the value of a specified key in this set.
func (l *Set) Value(k core.Key) (core.Value, bool) {
if l == nil {
return core.Value{}, false
}
value := l.equivalent.reflect()
vlen := value.Len()
idx := sort.Search(vlen, func(idx int) bool {
return value.Index(idx).Interface().(core.KeyValue).Key >= k
})
if idx >= vlen {
return core.Value{}, false
}
kv := value.Index(idx).Interface().(core.KeyValue)
if k == kv.Key {
return kv.Value, true
}
return core.Value{}, false
}
// HasValue tests whether a key is defined in this set.
func (l *Set) HasValue(k core.Key) bool {
if l == nil {
return false
}
_, ok := l.Value(k)
return ok
}
// Iter returns an iterator for visiting the labels in this set.
func (l *Set) Iter() Iterator {
return Iterator{
storage: l,
idx: -1,
}
}
// ToSlice returns the set of labels belonging to this set, sorted,
// where keys appear no more than once.
func (l *Set) ToSlice() []core.KeyValue {
iter := l.Iter()
return iter.ToSlice()
}
// Equivalent returns a value that may be used as a map key. The
// Distinct type guarantees that the result will equal the equivalent
// Distinct value of any label set with the same elements as this,
// where sets are made unique by choosing the last value in the input
// for any given key.
func (l *Set) Equivalent() Distinct {
if l == nil || !l.equivalent.Valid() {
return emptySet.equivalent
}
return l.equivalent
}
// Equals returns true if the argument set is equivalent to this set.
func (l *Set) Equals(o *Set) bool {
return l.Equivalent() == o.Equivalent()
}
// Encoded returns the encoded form of this set, according to
// `encoder`. The result will be cached in this `*Set`.
func (l *Set) Encoded(encoder Encoder) string {
if l == nil || encoder == nil {
return ""
}
id := encoder.ID()
if !id.Valid() {
// Invalid IDs are not cached.
return encoder.Encode(l.Iter())
}
var lookup *string
l.lock.Lock()
for idx := 0; idx < maxConcurrentEncoders; idx++ {
if l.encoders[idx] == id {
lookup = &l.encoded[idx]
break
}
}
l.lock.Unlock()
if lookup != nil {
return *lookup
}
r := encoder.Encode(l.Iter())
l.lock.Lock()
defer l.lock.Unlock()
for idx := 0; idx < maxConcurrentEncoders; idx++ {
if l.encoders[idx] == id {
return l.encoded[idx]
}
if !l.encoders[idx].Valid() {
l.encoders[idx] = id
l.encoded[idx] = r
return r
}
}
// TODO: This is a performance cliff. Find a way for this to
// generate a warning.
return r
}
// NewSet returns a new `*Set`. See the documentation for
// `NewSetWithSortable` for more details.
//
// Except for empty sets, this method adds an additional allocation
// compared with a call to `NewSetWithSortable`.
func NewSet(kvs ...core.KeyValue) Set {
// Check for empty set.
if len(kvs) == 0 {
return Set{
equivalent: emptySet.equivalent,
}
}
return NewSetWithSortable(kvs, new(Sortable))
}
// NewSetWithSortable returns a new `*Set`.
//
// Duplicate keys are eliminated by taking the last value. This
// re-orders the input slice so that unique last-values are contiguous
// at the end of the slice.
//
// This ensures the following:
//
// - Last-value-wins semantics
// - Caller sees the reordering, but doesn't lose values
// - Repeated call preserve last-value wins.
//
// Note that methods are defined `*Set`, although no allocation for
// `Set` is required. Callers can avoid memory allocations by:
//
// - allocating a `Sortable` for use as a temporary in this method
// - allocating a `Set` for storing the return value of this
// constructor.
//
// The result maintains a cache of encoded labels, by label.EncoderID.
// This value should not be copied after its first use.
func NewSetWithSortable(kvs []core.KeyValue, tmp *Sortable) Set {
// Check for empty set.
if len(kvs) == 0 {
return Set{
equivalent: emptySet.equivalent,
}
}
*tmp = kvs
// Stable sort so the following de-duplication can implement
// last-value-wins semantics.
sort.Stable(tmp)
*tmp = nil
position := len(kvs) - 1
offset := position - 1
// The requirements stated above require that the stable
// result be placed in the end of the input slice, while
// overwritten values are swapped to the beginning.
//
// De-duplicate with last-value-wins semantics. Preserve
// duplicate values at the beginning of the input slice.
for ; offset >= 0; offset-- {
if kvs[offset].Key == kvs[position].Key {
continue
}
kvs[offset], kvs[position-1] = kvs[position-1], kvs[offset]
position--
}
return Set{
equivalent: computeDistinct(kvs[position:]),
}
}
// computeDistinct returns a `Distinct` using either the fixed- or
// reflect-oriented code path, depending on the size of the input.
// The input slice is assumed to already be sorted and de-duplicated.
func computeDistinct(kvs []core.KeyValue) Distinct {
iface := computeDistinctFixed(kvs)
if iface == nil {
iface = computeDistinctReflect(kvs)
}
return Distinct{
iface: iface,
}
}
// computeDistinctFixed computes a `Distinct` for small slices. It
// returns nil if the input is too large for this code path.
func computeDistinctFixed(kvs []core.KeyValue) interface{} {
switch len(kvs) {
case 1:
ptr := new([1]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 2:
ptr := new([2]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 3:
ptr := new([3]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 4:
ptr := new([4]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 5:
ptr := new([5]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 6:
ptr := new([6]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 7:
ptr := new([7]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 8:
ptr := new([8]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 9:
ptr := new([9]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 10:
ptr := new([10]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
default:
return nil
}
}
// computeDistinctReflect computes a `Distinct` using reflection,
// works for any size input.
func computeDistinctReflect(kvs []core.KeyValue) interface{} {
at := reflect.New(reflect.ArrayOf(len(kvs), keyValueType)).Elem()
for i, kv := range kvs {
*(at.Index(i).Addr().Interface().(*core.KeyValue)) = kv
}
return at.Interface()
}
// MarshalJSON returns the JSON encoding of the `*Set`.
func (l *Set) MarshalJSON() ([]byte, error) {
return json.Marshal(l.equivalent.iface)
}
// Len implements `sort.Interface`.
func (l *Sortable) Len() int {
return len(*l)
}
// Swap implements `sort.Interface`.
func (l *Sortable) Swap(i, j int) {
(*l)[i], (*l)[j] = (*l)[j], (*l)[i]
}
// Less implements `sort.Interface`.
func (l *Sortable) Less(i, j int) bool {
return (*l)[i].Key < (*l)[j].Key
}

117
api/label/set_test.go Normal file
View File

@@ -0,0 +1,117 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package label_test
import (
"testing"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"github.com/stretchr/testify/require"
)
type testCase struct {
kvs []core.KeyValue
encoding string
}
func expect(enc string, kvs ...core.KeyValue) testCase {
return testCase{
kvs: kvs,
encoding: enc,
}
}
func TestSetDedup(t *testing.T) {
cases := []testCase{
expect("A=B", key.String("A", "2"), key.String("A", "B")),
expect("A=B", key.String("A", "2"), key.Int("A", 1), key.String("A", "B")),
expect("A=B", key.String("A", "B"), key.String("A", "C"), key.String("A", "D"), key.String("A", "B")),
expect("A=B,C=D", key.String("A", "1"), key.String("C", "D"), key.String("A", "B")),
expect("A=B,C=D", key.String("A", "2"), key.String("A", "B"), key.String("C", "D")),
expect("A=B,C=D", key.Float64("C", 1.2), key.String("A", "2"), key.String("A", "B"), key.String("C", "D")),
expect("A=B,C=D", key.String("C", "D"), key.String("A", "B"), key.String("A", "C"), key.String("A", "D"), key.String("A", "B")),
expect("A=B,C=D", key.String("A", "B"), key.String("C", "D"), key.String("A", "C"), key.String("A", "D"), key.String("A", "B")),
expect("A=B,C=D", key.String("A", "B"), key.String("A", "C"), key.String("A", "D"), key.String("A", "B"), key.String("C", "D")),
}
enc := label.DefaultEncoder()
s2d := map[string][]label.Distinct{}
d2s := map[label.Distinct][]string{}
for _, tc := range cases {
cpy := make([]core.KeyValue, len(tc.kvs))
copy(cpy, tc.kvs)
sl := label.NewSet(cpy...)
// Ensure that the input was reordered but no elements went missing.
require.ElementsMatch(t, tc.kvs, cpy)
str := sl.Encoded(enc)
equ := sl.Equivalent()
s2d[str] = append(s2d[str], equ)
d2s[equ] = append(d2s[equ], str)
require.Equal(t, tc.encoding, str)
}
for s, d := range s2d {
// No other Distinct values are equal to this.
for s2, d2 := range s2d {
if s2 == s {
continue
}
for _, elt := range d {
for _, otherDistinct := range d2 {
require.NotEqual(t, otherDistinct, elt)
}
}
}
for _, strings := range d2s {
if strings[0] == s {
continue
}
for _, otherString := range strings {
require.NotEqual(t, otherString, s)
}
}
}
for d, s := range d2s {
// No other Distinct values are equal to this.
for d2, s2 := range d2s {
if d2 == d {
continue
}
for _, elt := range s {
for _, otherDistinct := range s2 {
require.NotEqual(t, otherDistinct, elt)
}
}
}
for _, distincts := range s2d {
if distincts[0] == d {
continue
}
for _, otherDistinct := range distincts {
require.NotEqual(t, otherDistinct, d)
}
}
}
}

View File

@@ -39,7 +39,7 @@ type Config struct {
// Unit is an optional field describing the metric instrument.
Unit unit.Unit
// Resource describes the entity for which measurements are made.
Resource resource.Resource
Resource *resource.Resource
// LibraryName is the name given to the Meter that created
// this instrument. See `Provider`.
LibraryName string
@@ -134,7 +134,7 @@ func (d Descriptor) NumberKind() core.NumberKind {
// Resource returns the Resource describing the entity for which the metric
// instrument measures.
func (d Descriptor) Resource() resource.Resource {
func (d Descriptor) Resource() *resource.Resource {
return d.config.Resource
}
@@ -170,11 +170,11 @@ type Meter interface {
// RegisterInt64Observer creates a new integral observer with a
// given name, running a given callback, and customized with passed
// options. Callback can be nil.
// options. Callback may be nil.
RegisterInt64Observer(name string, callback Int64ObserverCallback, opts ...Option) (Int64Observer, error)
// RegisterFloat64Observer creates a new floating point observer
// with a given name, running a given callback, and customized with
// passed options. Callback can be nil.
// passed options. Callback may be nil.
RegisterFloat64Observer(name string, callback Float64ObserverCallback, opts ...Option) (Float64Observer, error)
}
@@ -203,14 +203,14 @@ func (u unitOption) Apply(config *Config) {
// WithResource applies provided Resource.
//
// This will override any existing Resource.
func WithResource(r resource.Resource) Option {
return resourceOption(r)
func WithResource(r *resource.Resource) Option {
return resourceOption{r}
}
type resourceOption resource.Resource
type resourceOption struct{ *resource.Resource }
func (r resourceOption) Apply(config *Config) {
config.Resource = resource.Resource(r)
config.Resource = r.Resource
}
// WithLibraryName applies provided library name. This is meant for

View File

@@ -40,7 +40,7 @@ func TestOptions(t *testing.T) {
opts []metric.Option
desc string
unit unit.Unit
resource resource.Resource
resource *resource.Resource
}
testcases := []testcase{
{
@@ -48,7 +48,7 @@ func TestOptions(t *testing.T) {
opts: nil,
desc: "",
unit: "",
resource: resource.Resource{},
resource: nil,
},
{
name: "description",
@@ -57,7 +57,7 @@ func TestOptions(t *testing.T) {
},
desc: "stuff",
unit: "",
resource: resource.Resource{},
resource: nil,
},
{
name: "description override",
@@ -67,7 +67,7 @@ func TestOptions(t *testing.T) {
},
desc: "things",
unit: "",
resource: resource.Resource{},
resource: nil,
},
{
name: "unit",
@@ -76,7 +76,7 @@ func TestOptions(t *testing.T) {
},
desc: "",
unit: "s",
resource: resource.Resource{},
resource: nil,
},
{
name: "unit override",
@@ -86,16 +86,16 @@ func TestOptions(t *testing.T) {
},
desc: "",
unit: "h",
resource: resource.Resource{},
resource: nil,
},
{
name: "resource override",
opts: []metric.Option{
metric.WithResource(*resource.New(key.New("name").String("test-name"))),
metric.WithResource(resource.New(key.New("name").String("test-name"))),
},
desc: "",
unit: "",
resource: *resource.New(key.New("name").String("test-name")),
resource: resource.New(key.New("name").String("test-name")),
},
}
for idx, tt := range testcases {

View File

@@ -127,7 +127,7 @@ func Configure(opts []Option) Config {
// The Resource method is used to set the Resource for Descriptors of new
// metric instruments.
type Resourcer interface {
Resource() resource.Resource
Resource() *resource.Resource
}
// insertResource inserts a WithResource option at the beginning of opts

View File

@@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
@@ -158,7 +159,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
batcher := ungrouped.New(selector, export.NewDefaultLabelEncoder(), true)
batcher := ungrouped.New(selector, label.DefaultEncoder(), true)
pusher := push.New(batcher, exporter, period)
pusher.Start()
@@ -340,7 +341,7 @@ func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
e.handler.ServeHTTP(w, r)
}
func labelsKeys(labels export.Labels) []string {
func labelsKeys(labels *label.Set) []string {
iter := labels.Iter()
keys := make([]string, 0, iter.Len())
for iter.Next() {
@@ -350,7 +351,7 @@ func labelsKeys(labels export.Labels) []string {
return keys
}
func labelValues(labels export.Labels) []string {
func labelValues(labels *label.Set) []string {
// TODO(paivagustavo): parse the labels.Encoded() instead of calling `Emit()` directly
// this would avoid unnecessary allocations.
iter := labels.Iter()

View File

@@ -26,10 +26,10 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
)
func TestPrometheusExporter(t *testing.T) {
@@ -41,7 +41,7 @@ func TestPrometheusExporter(t *testing.T) {
}
var expected []string
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
counter := metric.NewDescriptor(
"counter", metric.CounterKind, core.Float64NumberKind)

View File

@@ -24,6 +24,7 @@ import (
"time"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
@@ -61,7 +62,7 @@ type Config struct {
Quantiles []float64
// LabelEncoder encodes the labels
LabelEncoder export.LabelEncoder
LabelEncoder label.Encoder
}
type expoBatch struct {
@@ -103,7 +104,7 @@ func NewRawExporter(config Config) (*Exporter, error) {
}
}
if config.LabelEncoder == nil {
config.LabelEncoder = export.NewDefaultLabelEncoder()
config.LabelEncoder = label.DefaultEncoder()
}
return &Exporter{
config: config,

View File

@@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/stdout"
"go.opentelemetry.io/otel/exporters/metric/test"
@@ -93,7 +94,7 @@ func TestStdoutTimestamp(t *testing.T) {
before := time.Now()
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
ctx := context.Background()
desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Int64NumberKind)
@@ -139,7 +140,7 @@ func TestStdoutTimestamp(t *testing.T) {
func TestStdoutCounterFormat(t *testing.T) {
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind)
cagg := sum.New()
@@ -156,7 +157,7 @@ func TestStdoutCounterFormat(t *testing.T) {
func TestStdoutLastValueFormat(t *testing.T) {
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind)
lvagg := lastvalue.New()
@@ -173,7 +174,7 @@ func TestStdoutLastValueFormat(t *testing.T) {
func TestStdoutMinMaxSumCount(t *testing.T) {
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
desc := metric.NewDescriptor("test.name", metric.MeasureKind, core.Float64NumberKind)
magg := minmaxsumcount.New(&desc)
@@ -193,7 +194,7 @@ func TestStdoutMeasureFormat(t *testing.T) {
PrettyPrint: true,
})
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
desc := metric.NewDescriptor("test.name", metric.MeasureKind, core.Float64NumberKind)
magg := array.New()
@@ -247,7 +248,7 @@ func TestStdoutNoData(t *testing.T) {
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
magg := tc
magg.Checkpoint(fix.ctx, &desc)
@@ -264,7 +265,7 @@ func TestStdoutNoData(t *testing.T) {
func TestStdoutLastValueNotSet(t *testing.T) {
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind)
lvagg := lastvalue.New()

View File

@@ -19,6 +19,7 @@ import (
"errors"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
@@ -29,14 +30,14 @@ import (
)
type CheckpointSet struct {
encoder export.LabelEncoder
encoder label.Encoder
records map[string]export.Record
updates []export.Record
}
// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their encoded labels.
func NewCheckpointSet(encoder export.LabelEncoder) *CheckpointSet {
func NewCheckpointSet(encoder label.Encoder) *CheckpointSet {
return &CheckpointSet{
encoder: encoder,
records: make(map[string]export.Record),
@@ -53,14 +54,14 @@ func (p *CheckpointSet) Reset() {
// If there is an existing record with the same descriptor and labels,
// the stored aggregator will be returned and should be merged.
func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, labels ...core.KeyValue) (agg export.Aggregator, added bool) {
elabels := export.NewSimpleLabels(p.encoder, labels...)
elabels := label.NewSet(labels...)
key := desc.Name() + "_" + elabels.Encoded(p.encoder)
if record, ok := p.records[key]; ok {
return record.Aggregator(), false
}
rec := export.NewRecord(desc, elabels, newAgg)
rec := export.NewRecord(desc, &elabels, newAgg)
p.updates = append(p.updates, rec)
p.records[key] = rec
return newAgg, true

View File

@@ -18,6 +18,7 @@ import (
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/sdk/resource"
)
// Attributes transforms a slice of KeyValues into a slice of OTLP attribute key-values.
@@ -27,40 +28,63 @@ func Attributes(attrs []core.KeyValue) []*commonpb.AttributeKeyValue {
}
out := make([]*commonpb.AttributeKeyValue, 0, len(attrs))
for _, v := range attrs {
switch v.Value.Type() {
case core.BOOL:
out = append(out, &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_BOOL,
BoolValue: v.Value.AsBool(),
})
case core.INT64, core.INT32, core.UINT32, core.UINT64:
out = append(out, &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_INT,
IntValue: v.Value.AsInt64(),
})
case core.FLOAT32:
f32 := v.Value.AsFloat32()
out = append(out, &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_DOUBLE,
DoubleValue: float64(f32),
})
case core.FLOAT64:
out = append(out, &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_DOUBLE,
DoubleValue: v.Value.AsFloat64(),
})
case core.STRING:
out = append(out, &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_STRING,
StringValue: v.Value.AsString(),
})
}
for _, kv := range attrs {
out = append(out, toAttribute(kv))
}
return out
}
// ResourceAttributes transforms a Resource into a slice of OTLP attribute key-values.
func ResourceAttributes(resource *resource.Resource) []*commonpb.AttributeKeyValue {
if resource.Len() == 0 {
return nil
}
out := make([]*commonpb.AttributeKeyValue, 0, resource.Len())
for iter := resource.Iter(); iter.Next(); {
out = append(out, toAttribute(iter.Attribute()))
}
return out
}
func toAttribute(v core.KeyValue) *commonpb.AttributeKeyValue {
switch v.Value.Type() {
case core.BOOL:
return &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_BOOL,
BoolValue: v.Value.AsBool(),
}
case core.INT64, core.INT32, core.UINT32, core.UINT64:
return &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_INT,
IntValue: v.Value.AsInt64(),
}
case core.FLOAT32:
return &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_DOUBLE,
DoubleValue: float64(v.Value.AsFloat32()),
}
case core.FLOAT64:
return &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_DOUBLE,
DoubleValue: v.Value.AsFloat64(),
}
case core.STRING:
return &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_STRING,
StringValue: v.Value.AsString(),
}
default:
return &commonpb.AttributeKeyValue{
Key: string(v.Key),
Type: commonpb.AttributeKeyValue_STRING,
StringValue: "INVALID",
}
}
}

View File

@@ -28,6 +28,7 @@ import (
resourcepb "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
@@ -53,7 +54,7 @@ var (
// result is the product of transforming Records into OTLP Metrics.
type result struct {
Resource resource.Resource
Resource *resource.Resource
Library string
Metric *metricpb.Metric
Err error
@@ -152,18 +153,18 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e
}
// group by unique Resource string.
grouped := make(map[string]resourceBatch)
grouped := make(map[label.Distinct]resourceBatch)
for res := range in {
if res.Err != nil {
errStrings = append(errStrings, res.Err.Error())
continue
}
rID := res.Resource.String()
rID := res.Resource.Equivalent()
rb, ok := grouped[rID]
if !ok {
rb = resourceBatch{
Resource: Resource(&res.Resource),
Resource: Resource(res.Resource),
InstrumentationLibraryBatches: make(map[string]map[string]*metricpb.Metric),
}
grouped[rID] = rb
@@ -240,7 +241,7 @@ func Record(r export.Record) (*metricpb.Metric, error) {
}
// sum transforms a Sum Aggregator into an OTLP Metric.
func sum(desc *metric.Descriptor, labels export.Labels, a aggregator.Sum) (*metricpb.Metric, error) {
func sum(desc *metric.Descriptor, labels *label.Set, a aggregator.Sum) (*metricpb.Metric, error) {
sum, err := a.Sum()
if err != nil {
return nil, err
@@ -292,7 +293,7 @@ func minMaxSumCountValues(a aggregator.MinMaxSumCount) (min, max, sum core.Numbe
}
// minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric.
func minMaxSumCount(desc *metric.Descriptor, labels export.Labels, a aggregator.MinMaxSumCount) (*metricpb.Metric, error) {
func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregator.MinMaxSumCount) (*metricpb.Metric, error) {
min, max, sum, count, err := minMaxSumCountValues(a)
if err != nil {
return nil, err
@@ -327,7 +328,7 @@ func minMaxSumCount(desc *metric.Descriptor, labels export.Labels, a aggregator.
}
// stringKeyValues transforms a label iterator into an OTLP StringKeyValues.
func stringKeyValues(iter export.LabelIterator) []*commonpb.StringKeyValue {
func stringKeyValues(iter label.Iterator) []*commonpb.StringKeyValue {
l := iter.Len()
if l == 0 {
return nil

View File

@@ -25,9 +25,9 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/unit"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
@@ -60,23 +60,23 @@ func TestStringKeyValues(t *testing.T) {
key.String("the", "final word"),
},
[]*commonpb.StringKeyValue{
{Key: "true", Value: "true"},
{Key: "one", Value: "1"},
{Key: "two", Value: "2"},
{Key: "three", Value: "3"},
{Key: "four", Value: "4"},
{Key: "five", Value: "5"},
{Key: "six", Value: "6"},
{Key: "seven", Value: "7"},
{Key: "eight", Value: "8"},
{Key: "five", Value: "5"},
{Key: "four", Value: "4"},
{Key: "one", Value: "1"},
{Key: "seven", Value: "7"},
{Key: "six", Value: "6"},
{Key: "the", Value: "final word"},
{Key: "three", Value: "3"},
{Key: "true", Value: "true"},
{Key: "two", Value: "2"},
},
},
}
for _, test := range tests {
iter := export.LabelSlice(test.kvs).Iter()
assert.Equal(t, test.expected, stringKeyValues(iter))
labels := label.NewSet(test.kvs...)
assert.Equal(t, test.expected, stringKeyValues(labels.Iter()))
}
}
@@ -152,8 +152,8 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
metric.WithDescription(test.description),
metric.WithUnit(test.unit))
labels := export.NewSimpleLabels(export.NoopLabelEncoder{}, test.labels...)
got, err := minMaxSumCount(&desc, labels, mmsc)
labels := label.NewSet(test.labels...)
got, err := minMaxSumCount(&desc, &labels, mmsc)
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
@@ -162,7 +162,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
func TestMinMaxSumCountDatapoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.MeasureKind, core.Int64NumberKind)
labels := export.NewSimpleLabels(export.NoopLabelEncoder{})
labels := label.NewSet()
mmsc := minmaxsumcount.New(&desc)
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
@@ -183,7 +183,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
},
},
}
m, err := minMaxSumCount(&desc, labels, mmsc)
m, err := minMaxSumCount(&desc, &labels, mmsc)
if assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
@@ -249,8 +249,8 @@ func TestSumMetricDescriptor(t *testing.T) {
metric.WithDescription(test.description),
metric.WithUnit(test.unit),
)
labels := export.NewSimpleLabels(export.NoopLabelEncoder{}, test.labels...)
got, err := sum(&desc, labels, sumAgg.New())
labels := label.NewSet(test.labels...)
got, err := sum(&desc, &labels, sumAgg.New())
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
@@ -259,11 +259,11 @@ func TestSumMetricDescriptor(t *testing.T) {
func TestSumInt64DataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.MeasureKind, core.Int64NumberKind)
labels := export.NewSimpleLabels(export.NoopLabelEncoder{})
labels := label.NewSet()
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), core.Number(1), &desc))
s.Checkpoint(context.Background(), &desc)
if m, err := sum(&desc, labels, s); assert.NoError(t, err) {
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
@@ -273,11 +273,11 @@ func TestSumInt64DataPoints(t *testing.T) {
func TestSumFloat64DataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.MeasureKind, core.Float64NumberKind)
labels := export.NewSimpleLabels(export.NoopLabelEncoder{})
labels := label.NewSet()
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), core.NewFloat64Number(1), &desc))
s.Checkpoint(context.Background(), &desc)
if m, err := sum(&desc, labels, s); assert.NoError(t, err) {
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
@@ -287,9 +287,9 @@ func TestSumFloat64DataPoints(t *testing.T) {
func TestSumErrUnknownValueType(t *testing.T) {
desc := metric.NewDescriptor("", metric.MeasureKind, core.NumberKind(-1))
labels := export.NewSimpleLabels(export.NoopLabelEncoder{})
labels := label.NewSet()
s := sumAgg.New()
_, err := sum(&desc, labels, s)
_, err := sum(&desc, &labels, s)
assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) {
t.Errorf("expected ErrUnknownValueType, got %v", err)

View File

@@ -25,5 +25,5 @@ func Resource(r *resource.Resource) *resourcepb.Resource {
if r == nil {
return nil
}
return &resourcepb.Resource{Attributes: Attributes(r.Attributes())}
return &resourcepb.Resource{Attributes: ResourceAttributes(r)}
}

View File

@@ -19,6 +19,7 @@ import (
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
"go.opentelemetry.io/otel/api/label"
apitrace "go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/trace"
)
@@ -32,15 +33,12 @@ func SpanData(sdl []*export.SpanData) []*tracepb.ResourceSpans {
if len(sdl) == 0 {
return nil
}
// Group by the unique string representation of the Resource.
rsm := make(map[string]*tracepb.ResourceSpans)
// Group by the distinct representation of the Resource.
rsm := make(map[label.Distinct]*tracepb.ResourceSpans)
for _, sd := range sdl {
if sd != nil {
var key string
if sd.Resource != nil {
key = sd.Resource.String()
}
key := sd.Resource.Equivalent()
rs, ok := rsm[key]
if !ok {

View File

@@ -28,10 +28,10 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
metricapi "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/otlp"
exportmetric "go.opentelemetry.io/otel/sdk/export/metric"
exporttrace "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
@@ -112,7 +112,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
}
selector := simple.NewWithExactMeasure()
batcher := ungrouped.New(selector, exportmetric.NewDefaultLabelEncoder(), true)
batcher := ungrouped.New(selector, label.DefaultEncoder(), true)
pusher := push.New(batcher, exp, 60*time.Second)
pusher.Start()

View File

@@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
@@ -91,28 +92,28 @@ var (
Name: "int64-count",
Type: metricpb.MetricDescriptor_COUNTER_INT64,
Labels: []*commonpb.StringKeyValue{
{
Key: "host",
Value: "test.com",
},
{
Key: "CPU",
Value: "1",
},
{
Key: "host",
Value: "test.com",
},
},
}
cpu2MD = &metricpb.MetricDescriptor{
Name: "int64-count",
Type: metricpb.MetricDescriptor_COUNTER_INT64,
Labels: []*commonpb.StringKeyValue{
{
Key: "host",
Value: "test.com",
},
{
Key: "CPU",
Value: "2",
},
{
Key: "host",
Value: "test.com",
},
},
}
@@ -157,7 +158,7 @@ func TestNoGroupingExport(t *testing.T) {
},
[]metricpb.ResourceMetrics{
{
Resource: &resourcepb.Resource{},
Resource: nil,
InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{
{
Metrics: []*metricpb.Metric{
@@ -195,7 +196,7 @@ func TestMeasureMetricGroupingExport(t *testing.T) {
}
expected := []metricpb.ResourceMetrics{
{
Resource: &resourcepb.Resource{},
Resource: nil,
InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{
{
Metrics: []*metricpb.Metric{
@@ -204,14 +205,14 @@ func TestMeasureMetricGroupingExport(t *testing.T) {
Name: "measure",
Type: metricpb.MetricDescriptor_SUMMARY,
Labels: []*commonpb.StringKeyValue{
{
Key: "host",
Value: "test.com",
},
{
Key: "CPU",
Value: "1",
},
{
Key: "host",
Value: "test.com",
},
},
},
SummaryDataPoints: []*metricpb.SummaryDataPoint{
@@ -271,7 +272,7 @@ func TestCountInt64MetricGroupingExport(t *testing.T) {
[]record{r, r},
[]metricpb.ResourceMetrics{
{
Resource: &resourcepb.Resource{},
Resource: nil,
InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{
{
Metrics: []*metricpb.Metric{
@@ -307,7 +308,7 @@ func TestCountUint64MetricGroupingExport(t *testing.T) {
[]record{r, r},
[]metricpb.ResourceMetrics{
{
Resource: &resourcepb.Resource{},
Resource: nil,
InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{
{
Metrics: []*metricpb.Metric{
@@ -316,14 +317,14 @@ func TestCountUint64MetricGroupingExport(t *testing.T) {
Name: "uint64-count",
Type: metricpb.MetricDescriptor_COUNTER_INT64,
Labels: []*commonpb.StringKeyValue{
{
Key: "host",
Value: "test.com",
},
{
Key: "CPU",
Value: "1",
},
{
Key: "host",
Value: "test.com",
},
},
},
Int64DataPoints: []*metricpb.Int64DataPoint{
@@ -356,7 +357,7 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) {
[]record{r, r},
[]metricpb.ResourceMetrics{
{
Resource: &resourcepb.Resource{},
Resource: nil,
InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{
{
Metrics: []*metricpb.Metric{
@@ -365,14 +366,14 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) {
Name: "float64-count",
Type: metricpb.MetricDescriptor_COUNTER_DOUBLE,
Labels: []*commonpb.StringKeyValue{
{
Key: "host",
Value: "test.com",
},
{
Key: "CPU",
Value: "1",
},
{
Key: "host",
Value: "test.com",
},
},
},
DoubleDataPoints: []*metricpb.DoubleDataPoint{
@@ -400,28 +401,28 @@ func TestResourceMetricGroupingExport(t *testing.T) {
"int64-count",
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{metric.WithResource(*testInstA)},
[]metric.Option{metric.WithResource(testInstA)},
append(baseKeyValues, cpuKey.Int(1)),
},
{
"int64-count",
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{metric.WithResource(*testInstA)},
[]metric.Option{metric.WithResource(testInstA)},
append(baseKeyValues, cpuKey.Int(1)),
},
{
"int64-count",
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{metric.WithResource(*testInstA)},
[]metric.Option{metric.WithResource(testInstA)},
append(baseKeyValues, cpuKey.Int(2)),
},
{
"int64-count",
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{metric.WithResource(*testInstB)},
[]metric.Option{metric.WithResource(testInstB)},
append(baseKeyValues, cpuKey.Int(1)),
},
},
@@ -484,7 +485,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{
metric.WithResource(*testInstA),
metric.WithResource(testInstA),
metric.WithLibraryName("couting-lib"),
},
append(baseKeyValues, cpuKey.Int(1)),
@@ -494,7 +495,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{
metric.WithResource(*testInstA),
metric.WithResource(testInstA),
metric.WithLibraryName("couting-lib"),
},
append(baseKeyValues, cpuKey.Int(1)),
@@ -504,7 +505,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{
metric.WithResource(*testInstA),
metric.WithResource(testInstA),
metric.WithLibraryName("couting-lib"),
},
append(baseKeyValues, cpuKey.Int(2)),
@@ -514,7 +515,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{
metric.WithResource(*testInstA),
metric.WithResource(testInstA),
metric.WithLibraryName("summing-lib"),
},
append(baseKeyValues, cpuKey.Int(1)),
@@ -524,7 +525,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
metric.CounterKind,
core.Int64NumberKind,
[]metric.Option{
metric.WithResource(*testInstB),
metric.WithResource(testInstB),
metric.WithLibraryName("couting-lib"),
},
append(baseKeyValues, cpuKey.Int(1)),
@@ -619,7 +620,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
var recs []metricsdk.Record
for _, r := range rs {
desc := metric.NewDescriptor(r.name, r.mKind, r.nKind, r.opts...)
labs := metricsdk.NewSimpleLabels(metricsdk.NewDefaultLabelEncoder(), r.labels...)
labs := label.NewSet(r.labels...)
var agg metricsdk.Aggregator
switch r.mKind {
@@ -645,7 +646,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
}
agg.Checkpoint(ctx, &desc)
recs = append(recs, metricsdk.NewRecord(&desc, labs, agg))
recs = append(recs, metricsdk.NewRecord(&desc, &labs, agg))
}
assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: recs}))

View File

@@ -204,12 +204,13 @@ func spanDataToThrift(data *export.SpanData) *gen.Span {
}
}
// TODO (rghetia): what to do if a resource key is the same as one of the attribute's key
// TODO (rghetia): is there a need for prefixing keys with "resource-"?
// TODO (jmacd): OTel has a broad "last value wins"
// semantic. Should resources be appended before span
// attributes, above, to allow span attributes to
// overwrite resource attributes?
if data.Resource != nil {
for _, kv := range data.Resource.Attributes() {
tag := keyValueToTag(kv)
if tag != nil {
for iter := data.Resource.Iter(); iter.Next(); {
if tag := keyValueToTag(iter.Attribute()); tag != nil {
tags = append(tags, tag)
}
}

View File

@@ -1,95 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"bytes"
"sync"
"go.opentelemetry.io/otel/api/core"
)
// escapeChar is used to ensure uniqueness of the label encoding where
// keys or values contain either '=' or ','. Since there is no parser
// needed for this encoding and its only requirement is to be unique,
// this choice is arbitrary. Users will see these in some exporters
// (e.g., stdout), so the backslash ('\') is used a conventional choice.
const escapeChar = '\\'
type defaultLabelEncoder struct {
// pool is a pool of labelset builders. The buffers in this
// pool grow to a size that most label encodings will not
// allocate new memory.
pool sync.Pool // *bytes.Buffer
}
var _ LabelEncoder = &defaultLabelEncoder{}
// NewDefaultLabelEncoder returns a label encoder that encodes labels
// in such a way that each escaped label's key is followed by an equal
// sign and then by an escaped label's value. All key-value pairs are
// separated by a comma.
//
// Escaping is done by prepending a backslash before either a
// backslash, equal sign or a comma.
func NewDefaultLabelEncoder() LabelEncoder {
return &defaultLabelEncoder{
pool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
}
// Encode is a part of an implementation of the LabelEncoder
// interface.
func (d *defaultLabelEncoder) Encode(iter LabelIterator) string {
buf := d.pool.Get().(*bytes.Buffer)
defer d.pool.Put(buf)
buf.Reset()
for iter.Next() {
i, kv := iter.IndexedLabel()
if i > 0 {
_, _ = buf.WriteRune(',')
}
copyAndEscape(buf, string(kv.Key))
_, _ = buf.WriteRune('=')
if kv.Value.Type() == core.STRING {
copyAndEscape(buf, kv.Value.AsString())
} else {
_, _ = buf.WriteString(kv.Value.Emit())
}
}
return buf.String()
}
// ID is a part of an implementation of the LabelEncoder interface.
func (*defaultLabelEncoder) ID() int64 {
return defaultLabelEncoderID
}
func copyAndEscape(buf *bytes.Buffer, val string) {
for _, ch := range val {
switch ch {
case '=', ',', escapeChar:
buf.WriteRune(escapeChar)
}
buf.WriteRune(ch)
}
}

View File

@@ -16,33 +16,12 @@ package metric // import "go.opentelemetry.io/otel/sdk/export/metric"
import (
"context"
"sync/atomic"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
)
const (
// reserved ID for the noop label encoder
noopLabelEncoderID int64 = 1 + iota
// reserved ID for the default label encoder
defaultLabelEncoderID
// this must come last in enumeration
lastLabelEncoderID
)
// labelEncoderIDCounter is for generating IDs for other label
// encoders.
var labelEncoderIDCounter int64 = lastLabelEncoderID
// NewLabelEncoderID returns a unique label encoder ID. It should be
// called once per each type of label encoder. Preferably in init() or
// in var definition.
func NewLabelEncoderID() int64 {
return atomic.AddInt64(&labelEncoderIDCounter, 1)
}
// Batcher is responsible for deciding which kind of aggregation to
// use (via AggregationSelector), gathering exported results from the
// SDK during collection, and deciding over which dimensions to group
@@ -186,119 +165,6 @@ type Exporter interface {
Export(context.Context, CheckpointSet) error
}
// LabelStorage provides an access to the ordered labels.
type LabelStorage interface {
// NumLabels returns a number of labels in the storage.
NumLabels() int
// GetLabels gets a label from a passed index.
GetLabel(int) core.KeyValue
}
// LabelSlice implements LabelStorage in terms of a slice.
type LabelSlice []core.KeyValue
var _ LabelStorage = LabelSlice{}
// NumLabels is a part of LabelStorage implementation.
func (s LabelSlice) NumLabels() int {
return len(s)
}
// GetLabel is a part of LabelStorage implementation.
func (s LabelSlice) GetLabel(idx int) core.KeyValue {
return s[idx]
}
// Iter returns an iterator going over the slice.
func (s LabelSlice) Iter() LabelIterator {
return NewLabelIterator(s)
}
// LabelIterator allows iterating over an ordered set of labels. The
// typical use of the iterator is as follows:
//
// iter := export.NewLabelIterator(getStorage())
// for iter.Next() {
// label := iter.Label()
// // or, if we need an index:
// // idx, label := iter.IndexedLabel()
// // do something with label
// }
type LabelIterator struct {
storage LabelStorage
idx int
}
// NewLabelIterator creates an iterator going over a passed storage.
func NewLabelIterator(storage LabelStorage) LabelIterator {
return LabelIterator{
storage: storage,
idx: -1,
}
}
// Next moves the iterator to the next label. Returns false if there
// are no more labels.
func (i *LabelIterator) Next() bool {
i.idx++
return i.idx < i.Len()
}
// Label returns current label. Must be called only after Next returns
// true.
func (i *LabelIterator) Label() core.KeyValue {
return i.storage.GetLabel(i.idx)
}
// IndexedLabel returns current index and label. Must be called only
// after Next returns true.
func (i *LabelIterator) IndexedLabel() (int, core.KeyValue) {
return i.idx, i.Label()
}
// Len returns a number of labels in the iterator's label storage.
func (i *LabelIterator) Len() int {
return i.storage.NumLabels()
}
// Convenience function that creates a slice of labels from the passed
// iterator. The iterator is set up to start from the beginning before
// creating the slice.
func IteratorToSlice(iter LabelIterator) []core.KeyValue {
l := iter.Len()
if l == 0 {
return nil
}
iter.idx = -1
slice := make([]core.KeyValue, 0, l)
for iter.Next() {
slice = append(slice, iter.Label())
}
return slice
}
// LabelEncoder enables an optimization for export pipelines that use
// text to encode their label sets.
//
// This interface allows configuring the encoder used in the Batcher
// so that by the time the exporter is called, the same encoding may
// be used.
type LabelEncoder interface {
// Encode is called (concurrently) in instrumentation context.
//
// The expectation is that when setting up an export pipeline
// both the batcher and the exporter will use the same label
// encoder to avoid the duplicate computation of the encoded
// labels in the export path.
Encode(LabelIterator) string
// ID should return a unique positive number associated with
// the label encoder. Stateless label encoders could return
// the same number regardless of an instance, stateful label
// encoders should return a number depending on their state.
ID() int64
}
// CheckpointSet allows a controller to access a complete checkpoint of
// aggregated metrics from the Batcher. This is passed to the
// Exporter which may then use ForEach to iterate over the collection
@@ -319,56 +185,14 @@ type CheckpointSet interface {
// and label set.
type Record struct {
descriptor *metric.Descriptor
labels Labels
labels *label.Set
aggregator Aggregator
}
// Labels stores complete information about a computed label set,
// including the labels in an appropriate order (as defined by the
// Batcher). If the batcher does not re-order labels, they are
// presented in sorted order by the SDK.
type Labels interface {
Iter() LabelIterator
Encoded(LabelEncoder) string
}
type labels struct {
encoderID int64
encoded string
slice LabelSlice
}
var _ Labels = &labels{}
// NewSimpleLabels builds a Labels object, consisting of an ordered
// set of labels in a provided slice and a unique encoded
// representation generated by the passed encoder.
func NewSimpleLabels(encoder LabelEncoder, kvs ...core.KeyValue) Labels {
l := &labels{
encoderID: encoder.ID(),
slice: kvs,
}
l.encoded = encoder.Encode(l.Iter())
return l
}
// Iter is a part of an implementation of the Labels interface.
func (l *labels) Iter() LabelIterator {
return l.slice.Iter()
}
// Encoded is a part of an implementation of the Labels interface.
func (l *labels) Encoded(encoder LabelEncoder) string {
if l.encoderID == encoder.ID() {
return l.encoded
}
return encoder.Encode(l.Iter())
}
// NewRecord allows Batcher implementations to construct export
// records. The Descriptor, Labels, and Aggregator represent
// aggregate metric events received over a single collection period.
func NewRecord(descriptor *metric.Descriptor, labels Labels, aggregator Aggregator) Record {
func NewRecord(descriptor *metric.Descriptor, labels *label.Set, aggregator Aggregator) Record {
return Record{
descriptor: descriptor,
labels: labels,
@@ -389,6 +213,6 @@ func (r Record) Descriptor() *metric.Descriptor {
// Labels describes the labels associated with the instrument and the
// aggregated data.
func (r Record) Labels() Labels {
func (r Record) Labels() *label.Set {
return r.labels
}

View File

@@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"github.com/stretchr/testify/require"
)
@@ -28,8 +29,13 @@ var testSlice = []core.KeyValue{
key.Int("foo", 42),
}
func newIter(slice []core.KeyValue) label.Iterator {
labels := label.NewSet(slice...)
return labels.Iter()
}
func TestLabelIterator(t *testing.T) {
iter := LabelSlice(testSlice).Iter()
iter := newIter(testSlice)
require.Equal(t, 2, iter.Len())
require.True(t, iter.Next())
@@ -51,17 +57,17 @@ func TestLabelIterator(t *testing.T) {
}
func TestEmptyLabelIterator(t *testing.T) {
iter := LabelSlice(nil).Iter()
iter := newIter(nil)
require.Equal(t, 0, iter.Len())
require.False(t, iter.Next())
}
func TestIteratorToSlice(t *testing.T) {
iter := LabelSlice(testSlice).Iter()
got := IteratorToSlice(iter)
iter := newIter(testSlice)
got := iter.ToSlice()
require.Equal(t, testSlice, got)
iter = LabelSlice(nil).Iter()
got = IteratorToSlice(iter)
iter = newIter(nil)
got = iter.ToSlice()
require.Nil(t, got)
}

View File

@@ -1,31 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
// NoopLabelEncoder does no encoding at all.
type NoopLabelEncoder struct{}
var _ LabelEncoder = NoopLabelEncoder{}
// Encode is a part of an implementation of the LabelEncoder
// interface. It returns an empty string.
func (NoopLabelEncoder) Encode(LabelIterator) string {
return ""
}
// ID is a part of an implementation of the LabelEncoder interface.
func (NoopLabelEncoder) ID() int64 {
return noopLabelEncoderID
}

View File

@@ -20,6 +20,5 @@ func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"record.refMapped.value": unsafe.Offsetof(record{}.refMapped.value),
"record.updateCount": unsafe.Offsetof(record{}.updateCount),
"record.labels.cachedEncoderID": unsafe.Offsetof(record{}.labels.cachedEncoded),
}
}

View File

@@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
@@ -35,7 +36,7 @@ type (
// Output collects distinct metric/label set outputs.
Output struct {
Map map[string]float64
labelEncoder export.LabelEncoder
labelEncoder label.Encoder
}
// testAggregationSelector returns aggregators consistent with
@@ -59,22 +60,22 @@ var (
// SdkEncoder uses a non-standard encoder like K1~V1&K2~V2
SdkEncoder = &Encoder{}
// GroupEncoder uses the SDK default encoder
GroupEncoder = export.NewDefaultLabelEncoder()
GroupEncoder = label.DefaultEncoder()
// LastValue groups are (labels1), (labels2+labels3)
// Counter groups are (labels1+labels2), (labels3)
// Labels1 has G=H and C=D
Labels1 = makeLabels(SdkEncoder, key.String("G", "H"), key.String("C", "D"))
Labels1 = makeLabels(key.String("G", "H"), key.String("C", "D"))
// Labels2 has C=D and E=F
Labels2 = makeLabels(SdkEncoder, key.String("C", "D"), key.String("E", "F"))
Labels2 = makeLabels(key.String("C", "D"), key.String("E", "F"))
// Labels3 is the empty set
Labels3 = makeLabels(SdkEncoder)
Labels3 = makeLabels()
leID = export.NewLabelEncoderID()
testLabelEncoderID = label.NewEncoderID()
)
func NewOutput(labelEncoder export.LabelEncoder) Output {
func NewOutput(labelEncoder label.Encoder) Output {
return Output{
Map: make(map[string]float64),
labelEncoder: labelEncoder,
@@ -99,11 +100,12 @@ func (*testAggregationSelector) AggregatorFor(desc *metric.Descriptor) export.Ag
}
}
func makeLabels(encoder export.LabelEncoder, labels ...core.KeyValue) export.Labels {
return export.NewSimpleLabels(encoder, labels...)
func makeLabels(labels ...core.KeyValue) *label.Set {
s := label.NewSet(labels...)
return &s
}
func (Encoder) Encode(iter export.LabelIterator) string {
func (Encoder) Encode(iter label.Iterator) string {
var sb strings.Builder
for iter.Next() {
i, l := iter.IndexedLabel()
@@ -117,8 +119,8 @@ func (Encoder) Encode(iter export.LabelIterator) string {
return sb.String()
}
func (Encoder) ID() int64 {
return leID
func (Encoder) ID() label.EncoderID {
return testLabelEncoderID
}
// LastValueAgg returns a checkpointed lastValue aggregator w/ the specified descriptor and value.
@@ -131,12 +133,12 @@ func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator {
}
// Convenience method for building a test exported lastValue record.
func NewLastValueRecord(desc *metric.Descriptor, labels export.Labels, value int64) export.Record {
func NewLastValueRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record {
return export.NewRecord(desc, labels, LastValueAgg(desc, value))
}
// Convenience method for building a test exported counter record.
func NewCounterRecord(desc *metric.Descriptor, labels export.Labels, value int64) export.Record {
func NewCounterRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record {
return export.NewRecord(desc, labels, CounterAgg(desc, value))
}

View File

@@ -18,6 +18,7 @@ import (
"context"
"errors"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
@@ -28,7 +29,7 @@ type (
selector export.AggregationSelector
batchMap batchMap
stateful bool
labelEncoder export.LabelEncoder
labelEncoder label.Encoder
}
batchKey struct {
@@ -38,7 +39,7 @@ type (
batchValue struct {
aggregator export.Aggregator
labels export.Labels
labels *label.Set
}
batchMap map[batchKey]batchValue
@@ -47,7 +48,7 @@ type (
var _ export.Batcher = &Batcher{}
var _ export.CheckpointSet = batchMap{}
func New(selector export.AggregationSelector, labelEncoder export.LabelEncoder, stateful bool) *Batcher {
func New(selector export.AggregationSelector, labelEncoder label.Encoder, stateful bool) *Batcher {
return &Batcher{
selector: selector,
batchMap: batchMap{},

View File

@@ -67,16 +67,16 @@ func TestUngroupedStateless(t *testing.T) {
// Output lastvalue should have only the "G=H" and "G=" keys.
// Output counter should have only the "C=D" and "C=" keys.
require.EqualValues(t, map[string]float64{
"sum.a/G~H&C~D": 60, // labels1
"sum.a/C~D&G~H": 60, // labels1
"sum.a/C~D&E~F": 20, // labels2
"sum.a/": 40, // labels3
"sum.b/G~H&C~D": 60, // labels1
"sum.b/C~D&G~H": 60, // labels1
"sum.b/C~D&E~F": 20, // labels2
"sum.b/": 40, // labels3
"lastvalue.a/G~H&C~D": 50, // labels1
"lastvalue.a/C~D&G~H": 50, // labels1
"lastvalue.a/C~D&E~F": 20, // labels2
"lastvalue.a/": 30, // labels3
"lastvalue.b/G~H&C~D": 50, // labels1
"lastvalue.b/C~D&G~H": 50, // labels1
"lastvalue.b/C~D&E~F": 20, // labels2
"lastvalue.b/": 30, // labels3
}, records.Map)
@@ -109,8 +109,8 @@ func TestUngroupedStateful(t *testing.T) {
_ = checkpointSet.ForEach(records1.AddTo)
require.EqualValues(t, map[string]float64{
"sum.a/G~H&C~D": 10, // labels1
"sum.b/G~H&C~D": 10, // labels1
"sum.a/C~D&G~H": 10, // labels1
"sum.b/C~D&G~H": 10, // labels1
}, records1.Map)
// Test that state was NOT reset
@@ -149,7 +149,7 @@ func TestUngroupedStateful(t *testing.T) {
_ = checkpointSet.ForEach(records4.AddTo)
require.EqualValues(t, map[string]float64{
"sum.a/G~H&C~D": 30,
"sum.b/G~H&C~D": 30,
"sum.a/C~D&G~H": 30,
"sum.b/C~D&G~H": 30,
}, records4.Map)
}

View File

@@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
@@ -560,7 +561,7 @@ func BenchmarkBatchRecord_8Labels_8Instruments(b *testing.B) {
func BenchmarkRepeatedDirectCalls(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
encoder := export.NewDefaultLabelEncoder()
encoder := label.DefaultEncoder()
fix.pcb = func(_ context.Context, rec export.Record) error {
_ = rec.Labels().Encoded(encoder)
return nil

View File

@@ -26,7 +26,7 @@ type Config struct {
// Resource is the OpenTelemetry resource associated with all Meters
// created by the SDK.
Resource resource.Resource
Resource *resource.Resource
}
// Option is the interface that applies the value to a configuration option.
@@ -47,12 +47,12 @@ func (o errorHandlerOption) Apply(config *Config) {
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(r resource.Resource) Option {
return resourceOption(r)
func WithResource(r *resource.Resource) Option {
return resourceOption{r}
}
type resourceOption resource.Resource
type resourceOption struct{ *resource.Resource }
func (o resourceOption) Apply(config *Config) {
config.Resource = resource.Resource(o)
config.Resource = o.Resource
}

View File

@@ -51,11 +51,11 @@ func TestWithResource(t *testing.T) {
r := resource.New(key.String("A", "a"))
c := &Config{}
WithResource(*r).Apply(c)
assert.Equal(t, *r, c.Resource)
WithResource(r).Apply(c)
assert.True(t, r.Equal(c.Resource))
// Ensure overwriting works.
c = &Config{Resource: resource.Resource{}}
WithResource(*r).Apply(c)
assert.Equal(t, *r, c.Resource)
c = &Config{Resource: &resource.Resource{}}
WithResource(r).Apply(c)
assert.Equal(t, r.Equivalent(), c.Resource.Equivalent())
}

View File

@@ -29,7 +29,7 @@ type Config struct {
// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
Resource resource.Resource
Resource *resource.Resource
}
// Option is the interface that applies the value to a configuration option.
@@ -50,12 +50,12 @@ func (o errorHandlerOption) Apply(config *Config) {
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(r resource.Resource) Option {
return resourceOption(r)
func WithResource(r *resource.Resource) Option {
return resourceOption{r}
}
type resourceOption resource.Resource
type resourceOption struct{ *resource.Resource }
func (o resourceOption) Apply(config *Config) {
config.Resource = resource.Resource(o)
config.Resource = o.Resource
}

View File

@@ -52,11 +52,11 @@ func TestWithResource(t *testing.T) {
r := resource.New(key.String("A", "a"))
c := &Config{}
WithResource(*r).Apply(c)
assert.Equal(t, *r, c.Resource)
WithResource(r).Apply(c)
assert.Equal(t, r.Equivalent(), c.Resource.Equivalent())
// Ensure overwriting works.
c = &Config{Resource: resource.Resource{}}
WithResource(*r).Apply(c)
assert.Equal(t, *r, c.Resource)
c = &Config{Resource: &resource.Resource{}}
WithResource(r).Apply(c)
assert.Equal(t, r.Equivalent(), c.Resource.Equivalent())
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
@@ -67,7 +68,7 @@ var _ push.Clock = mockClock{}
var _ push.Ticker = mockTicker{}
func newFixture(t *testing.T) testFixture {
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
batcher := &testBatcher{
t: t,
@@ -103,7 +104,7 @@ func (b *testBatcher) FinishedCollection() {
func (b *testBatcher) Process(_ context.Context, record export.Record) error {
b.lock.Lock()
defer b.lock.Unlock()
labels := export.IteratorToSlice(record.Labels().Iter())
labels := record.Labels().ToSlice()
b.checkpointSet.Add(record.Descriptor(), record.Aggregator(), labels...)
return nil
}

View File

@@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
@@ -241,28 +242,33 @@ func TestSDKLabelsDeduplication(t *testing.T) {
sum, _ := rec.Aggregator().(aggregator.Sum).Sum()
require.Equal(t, sum, core.NewInt64Number(2))
kvs := export.IteratorToSlice(rec.Labels().Iter())
kvs := rec.Labels().ToSlice()
actual = append(actual, kvs)
}
require.ElementsMatch(t, allExpect, actual)
}
func TestDefaultLabelEncoder(t *testing.T) {
encoder := export.NewDefaultLabelEncoder()
func newSetIter(kvs ...core.KeyValue) label.Iterator {
labels := label.NewSet(kvs...)
return labels.Iter()
}
encoded := encoder.Encode(export.LabelSlice([]core.KeyValue{key.String("A", "B"), key.String("C", "D")}).Iter())
func TestDefaultLabelEncoder(t *testing.T) {
encoder := label.DefaultEncoder()
encoded := encoder.Encode(newSetIter(key.String("A", "B"), key.String("C", "D")))
require.Equal(t, `A=B,C=D`, encoded)
encoded = encoder.Encode(export.LabelSlice([]core.KeyValue{key.String("A", "B,c=d"), key.String(`C\`, "D")}).Iter())
encoded = encoder.Encode(newSetIter(key.String("A", "B,c=d"), key.String(`C\`, "D")))
require.Equal(t, `A=B\,c\=d,C\\=D`, encoded)
encoded = encoder.Encode(export.LabelSlice([]core.KeyValue{key.String(`\`, `=`), key.String(`,`, `\`)}).Iter())
require.Equal(t, `\\=\=,\,=\\`, encoded)
encoded = encoder.Encode(newSetIter(key.String(`\`, `=`), key.String(`,`, `\`)))
require.Equal(t, `\,=\\,\\=\=`, encoded)
// Note: the label encoder does not sort or de-dup values,
// that is done in Labels(...).
encoded = encoder.Encode(export.LabelSlice([]core.KeyValue{
encoded = encoder.Encode(newSetIter(
key.Int("I", 1),
key.Uint("U", 1),
key.Int32("I32", 1),
@@ -273,8 +279,8 @@ func TestDefaultLabelEncoder(t *testing.T) {
key.Float64("F64", 1),
key.String("S", "1"),
key.Bool("B", true),
}).Iter())
require.Equal(t, "I=1,U=1,I32=1,U32=1,I64=1,U64=1,F64=1,F64=1,S=1,B=true", encoded)
))
require.Equal(t, "B=true,F64=1,I=1,I32=1,I64=1,S=1,U=1,U32=1,U64=1", encoded)
}
func TestObserverCollection(t *testing.T) {
@@ -307,7 +313,7 @@ func TestObserverCollection(t *testing.T) {
require.Equal(t, 4, collected)
require.Equal(t, 4, len(batcher.records))
out := batchTest.NewOutput(export.NewDefaultLabelEncoder())
out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range batcher.records {
_ = out.AddTo(rec)
}
@@ -347,7 +353,7 @@ func TestRecordBatch(t *testing.T) {
sdk.Collect(ctx)
out := batchTest.NewOutput(export.NewDefaultLabelEncoder())
out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range batcher.records {
_ = out.AddTo(rec)
}

View File

@@ -1,31 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import "go.opentelemetry.io/otel/api/core"
type sortedLabels []core.KeyValue
func (l *sortedLabels) Len() int {
return len(*l)
}
func (l *sortedLabels) Swap(i, j int) {
(*l)[i], (*l)[j] = (*l)[j], (*l)[i]
}
func (l *sortedLabels) Less(i, j int) bool {
return (*l)[i].Key < (*l)[j].Key
}

View File

@@ -18,13 +18,12 @@ import (
"context"
"fmt"
"os"
"reflect"
"runtime"
"sort"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
api "go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
@@ -63,42 +62,23 @@ type (
errorHandler ErrorHandler
// resource represents the entity producing telemetry.
resource resource.Resource
resource *resource.Resource
// asyncSortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation. It is cleared after use.
asyncSortSlice sortedLabels
asyncSortSlice label.Sortable
}
syncInstrument struct {
instrument
}
// orderedLabels is a variable-size array of core.KeyValue
// suitable for use as a map key.
orderedLabels interface{}
// labels represents an internalized set of labels that have been
// sorted and deduplicated.
labels struct {
// cachedEncoderID needs to be aligned for atomic access
cachedEncoderID int64
// cachedEncoded is an encoded version of ordered
// labels
cachedEncoded string
// ordered is the output of sorting and deduplicating
// the labels, copied into an array of the correct
// size for use as a map key.
ordered orderedLabels
}
// mapkey uniquely describes a metric instrument in terms of
// its InstrumentID and the encoded form of its labels.
mapkey struct {
descriptor *metric.Descriptor
ordered orderedLabels
ordered label.Distinct
}
// record maintains the state of one metric instrument. Due
@@ -117,15 +97,21 @@ type (
// supports checking for no updates during a round.
collectedCount int64
// storage is the stored label set for this record,
// except in cases where a label set is shared due to
// batch recording.
storage label.Set
// labels is the processed label set for this record.
//
// labels has to be aligned for 64-bit atomic operations.
labels labels
// this may refer to the `storage` field in another
// record if this label set is shared resulting from
// `RecordBatch`.
labels *label.Set
// sortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation.
sortSlice sortedLabels
sortSlice label.Sortable
// inst is a pointer to the corresponding instrument.
inst *syncInstrument
@@ -145,14 +131,14 @@ type (
instrument
// recorders maps ordered labels to the pair of
// labelset and recorder
recorders map[orderedLabels]labeledRecorder
recorders map[label.Distinct]*labeledRecorder
callback func(func(core.Number, []core.KeyValue))
}
labeledRecorder struct {
observedEpoch int64
labels labels
labels *label.Set
recorder export.Aggregator
}
@@ -164,15 +150,6 @@ var (
_ api.AsyncImpl = &asyncInstrument{}
_ api.SyncImpl = &syncInstrument{}
_ api.BoundSyncImpl = &record{}
_ api.Resourcer = &SDK{}
_ export.LabelStorage = &labels{}
_ export.Labels = &labels{}
kvType = reflect.TypeOf(core.KeyValue{})
emptyLabels = labels{
ordered: [0]core.KeyValue{},
}
)
func (inst *instrument) Descriptor() api.Descriptor {
@@ -208,9 +185,9 @@ func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {
// We are in a single-threaded context. Note: this assumption
// could be violated if the user added concurrency within
// their callback.
labels := a.meter.makeLabels(kvs, &a.meter.asyncSortSlice)
labels := label.NewSetWithSortable(kvs, &a.meter.asyncSortSlice)
lrec, ok := a.recorders[labels.ordered]
lrec, ok := a.recorders[labels.Equivalent()]
if ok {
if lrec.observedEpoch == a.meter.currentEpoch {
// last value wins for Observers, so if we see the same labels
@@ -219,19 +196,19 @@ func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {
} else {
lrec.observedEpoch = a.meter.currentEpoch
}
a.recorders[labels.ordered] = lrec
a.recorders[labels.Equivalent()] = lrec
return lrec.recorder
}
rec := a.meter.batcher.AggregatorFor(&a.descriptor)
if a.recorders == nil {
a.recorders = make(map[orderedLabels]labeledRecorder)
a.recorders = make(map[label.Distinct]*labeledRecorder)
}
// This may store nil recorder in the map, thus disabling the
// asyncInstrument for the labelset for good. This is intentional,
// but will be revisited later.
a.recorders[labels.ordered] = labeledRecorder{
a.recorders[labels.Equivalent()] = &labeledRecorder{
recorder: rec,
labels: labels,
labels: &labels,
observedEpoch: a.meter.currentEpoch,
}
return rec
@@ -246,25 +223,27 @@ func (m *SDK) SetErrorHandler(f ErrorHandler) {
// support re-use of the orderedLabels computed by a previous
// measurement in the same batch. This performs two allocations
// in the common case.
func (s *syncInstrument) acquireHandle(kvs []core.KeyValue, lptr *labels) *record {
func (s *syncInstrument) acquireHandle(kvs []core.KeyValue, labelPtr *label.Set) *record {
var rec *record
var labels labels
var equiv label.Distinct
if lptr == nil || lptr.ordered == nil {
if labelPtr == nil {
// This memory allocation may not be used, but it's
// needed for the `sortSlice` field, to avoid an
// allocation while sorting.
rec = &record{}
labels = s.meter.makeLabels(kvs, &rec.sortSlice)
rec.storage = label.NewSetWithSortable(kvs, &rec.sortSlice)
rec.labels = &rec.storage
equiv = rec.storage.Equivalent()
} else {
labels = *lptr
equiv = labelPtr.Equivalent()
}
// Create lookup key for sync.Map (one allocation, as this
// passes through an interface{})
mk := mapkey{
descriptor: &s.descriptor,
ordered: labels.ordered,
ordered: equiv,
}
if actual, ok := s.meter.current.Load(mk); ok {
@@ -280,9 +259,9 @@ func (s *syncInstrument) acquireHandle(kvs []core.KeyValue, lptr *labels) *recor
if rec == nil {
rec = &record{}
rec.labels = labelPtr
}
rec.refMapped = refcountMapped{value: 2}
rec.labels = labels
rec.inst = s
rec.recorder = s.meter.batcher.AggregatorFor(&s.descriptor)
@@ -352,169 +331,6 @@ func DefaultErrorHandler(err error) {
fmt.Fprintln(os.Stderr, "Metrics SDK error:", err)
}
// makeLabels returns a `labels` corresponding to the arguments. Labels
// are sorted and de-duplicated, with last-value-wins semantics. Note that
// sorting and deduplicating happens in-place to avoid allocation, so the
// passed slice will be modified. The `sortSlice` argument refers to a memory
// location used temporarily while sorting the slice, to avoid a memory
// allocation.
func (m *SDK) makeLabels(kvs []core.KeyValue, sortSlice *sortedLabels) labels {
// Check for empty set.
if len(kvs) == 0 {
return emptyLabels
}
*sortSlice = kvs
// Sort and de-duplicate. Note: this use of `sortSlice`
// avoids an allocation because it is a pointer.
sort.Stable(sortSlice)
*sortSlice = nil
oi := 1
for i := 1; i < len(kvs); i++ {
if kvs[i-1].Key == kvs[i].Key {
// Overwrite the value for "last-value wins".
kvs[oi-1].Value = kvs[i].Value
continue
}
kvs[oi] = kvs[i]
oi++
}
kvs = kvs[0:oi]
return computeOrderedLabels(kvs)
}
// NumLabels is a part of an implementation of the export.LabelStorage
// interface.
func (ls *labels) NumLabels() int {
return reflect.ValueOf(ls.ordered).Len()
}
// GetLabel is a part of an implementation of the export.LabelStorage
// interface.
func (ls *labels) GetLabel(idx int) core.KeyValue {
// Note: The Go compiler successfully avoids an allocation for
// the interface{} conversion here:
return reflect.ValueOf(ls.ordered).Index(idx).Interface().(core.KeyValue)
}
// Iter is a part of an implementation of the export.Labels interface.
func (ls *labels) Iter() export.LabelIterator {
return export.NewLabelIterator(ls)
}
// Encoded is a part of an implementation of the export.Labels
// interface.
func (ls *labels) Encoded(encoder export.LabelEncoder) string {
id := encoder.ID()
if id <= 0 {
// Punish misbehaving encoders by not even trying to
// cache them
return encoder.Encode(ls.Iter())
}
cachedID := atomic.LoadInt64(&ls.cachedEncoderID)
// If cached ID is less than zero, it means that other
// goroutine is currently caching the encoded labels and the
// ID of the encoder. Wait until it's done - it's a
// nonblocking op.
for cachedID < 0 {
// Let other goroutine finish its work.
runtime.Gosched()
cachedID = atomic.LoadInt64(&ls.cachedEncoderID)
}
// At this point, cachedID is either 0 (nothing cached) or
// some other number.
//
// If cached ID is the same as ID of the passed encoder, we've
// got the fast path.
if cachedID == id {
return ls.cachedEncoded
}
// If we are here, either some other encoder cached its
// encoded labels or the cache is still for the taking. Either
// way, we need to compute the encoded labels anyway.
encoded := encoder.Encode(ls.Iter())
// If some other encoder took the cache, then we just return
// our encoded labels. That's a slow path.
if cachedID > 0 {
return encoded
}
// Try to take the cache for ourselves. This is the place
// where other encoders may be "blocked".
if atomic.CompareAndSwapInt64(&ls.cachedEncoderID, 0, -1) {
// The cache is ours.
ls.cachedEncoded = encoded
atomic.StoreInt64(&ls.cachedEncoderID, id)
}
return encoded
}
func computeOrderedLabels(kvs []core.KeyValue) labels {
var ls labels
ls.ordered = computeOrderedFixed(kvs)
if ls.ordered == nil {
ls.ordered = computeOrderedReflect(kvs)
}
return ls
}
func computeOrderedFixed(kvs []core.KeyValue) orderedLabels {
switch len(kvs) {
case 1:
ptr := new([1]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 2:
ptr := new([2]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 3:
ptr := new([3]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 4:
ptr := new([4]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 5:
ptr := new([5]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 6:
ptr := new([6]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 7:
ptr := new([7]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 8:
ptr := new([8]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 9:
ptr := new([9]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
case 10:
ptr := new([10]core.KeyValue)
copy((*ptr)[:], kvs)
return *ptr
default:
return nil
}
}
func computeOrderedReflect(kvs []core.KeyValue) interface{} {
at := reflect.New(reflect.ArrayOf(len(kvs), kvType)).Elem()
for i, kv := range kvs {
*(at.Index(i).Addr().Interface().(*core.KeyValue)) = kv
}
return at.Interface()
}
func (m *SDK) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) {
return &syncInstrument{
instrument: instrument{
@@ -611,7 +427,7 @@ func (m *SDK) collectAsync(ctx context.Context) int {
}
func (m *SDK) checkpointRecord(ctx context.Context, r *record) int {
return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, &r.labels)
return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels)
}
func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
@@ -623,7 +439,7 @@ func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
lrec := lrec
epochDiff := m.currentEpoch - lrec.observedEpoch
if epochDiff == 0 {
checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, &lrec.labels)
checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, lrec.labels)
} else if epochDiff > 1 {
// This is second collection cycle with no
// observations for this labelset. Remove the
@@ -637,7 +453,7 @@ func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
return checkpointed
}
func (m *SDK) checkpoint(ctx context.Context, descriptor *metric.Descriptor, recorder export.Aggregator, labels *labels) int {
func (m *SDK) checkpoint(ctx context.Context, descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int {
if recorder == nil {
return 0
}
@@ -657,7 +473,7 @@ func (m *SDK) checkpoint(ctx context.Context, descriptor *metric.Descriptor, rec
// Resource means that the SDK implements the Resourcer interface and
// therefore all metric instruments it creates will inherit its
// Resource by default unless explicitly overwritten.
func (m *SDK) Resource() resource.Resource {
func (m *SDK) Resource() *resource.Resource {
return m.resource
}
@@ -667,15 +483,15 @@ func (m *SDK) RecordBatch(ctx context.Context, kvs []core.KeyValue, measurements
// called. Subsequent calls to acquireHandle will re-use the
// previously computed value instead of recomputing the
// ordered labels.
var labels labels
var labelsPtr *label.Set
for i, meas := range measurements {
s := meas.SyncImpl().(*syncInstrument)
h := s.acquireHandle(kvs, &labels)
h := s.acquireHandle(kvs, labelsPtr)
// Re-use labels for the next measurement.
if i == 0 {
labels = h.labels
labelsPtr = h.labels
}
defer h.Unbind()
@@ -708,6 +524,6 @@ func (r *record) Unbind() {
func (r *record) mapkey() mapkey {
return mapkey{
descriptor: &r.inst.descriptor,
ordered: r.labels.ordered,
ordered: r.labels.Equivalent(),
}
}

View File

@@ -265,7 +265,7 @@ func (*testFixture) FinishedCollection() {
}
func (f *testFixture) Process(_ context.Context, record export.Record) error {
labels := export.IteratorToSlice(record.Labels().Iter())
labels := record.Labels().ToSlice()
key := testKey{
labels: canonicalizeLabels(labels),
descriptor: record.Descriptor(),

View File

@@ -1,65 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resource
import "go.opentelemetry.io/otel/api/core"
// AttributeIterator allows iterating over an ordered set of Resource attributes.
//
// The typical use of the iterator assuming a Resource named `res`, is
// something like the following:
//
// for iter := res.Iter(); iter.Next(); {
// attr := iter.Attribute()
// // or, if an index is needed:
// // idx, attr := iter.IndexedAttribute()
//
// // ...
// }
type AttributeIterator struct {
attrs []core.KeyValue
idx int
}
// NewAttributeIterator creates an iterator going over a passed attrs.
func NewAttributeIterator(attrs []core.KeyValue) AttributeIterator {
return AttributeIterator{attrs: attrs, idx: -1}
}
// Next moves the iterator to the next attribute.
// Returns false if there are no more attributes.
func (i *AttributeIterator) Next() bool {
i.idx++
return i.idx < i.Len()
}
// Attribute returns current attribute.
//
// Must be called only after Next returns true.
func (i *AttributeIterator) Attribute() core.KeyValue {
return i.attrs[i.idx]
}
// IndexedAttribute returns current index and attribute.
//
// Must be called only after Next returns true.
func (i *AttributeIterator) IndexedAttribute() (int, core.KeyValue) {
return i.idx, i.Attribute()
}
// Len returns a number of attributes.
func (i *AttributeIterator) Len() int {
return len(i.attrs)
}

View File

@@ -17,101 +17,119 @@
package resource
import (
"encoding/json"
"sort"
"strings"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/label"
)
// Resource describes an entity about which identifying information and metadata is exposed.
// Resource describes an entity about which identifying information
// and metadata is exposed. Resource is an immutable object,
// equivalent to a map from key to unique value.
//
// Resources should be passed and stored as pointers
// (`*resource.Resource`). The `nil` value is equivalent to an empty
// Resource.
type Resource struct {
sorted []core.KeyValue
keySet map[core.Key]struct{}
labels label.Set
}
// New creates a resource from a set of attributes.
// If there are duplicates keys then the first value of the key is preserved.
var emptyResource Resource
// New creates a resource from a set of attributes. If there are
// duplicate keys present in the list of attributes, then the last
// value found for the key is preserved.
func New(kvs ...core.KeyValue) *Resource {
res := &Resource{keySet: make(map[core.Key]struct{})}
for _, kv := range kvs {
// First key wins.
if _, ok := res.keySet[kv.Key]; !ok {
res.keySet[kv.Key] = struct{}{}
res.sorted = append(res.sorted, kv)
return &Resource{
labels: label.NewSet(kvs...),
}
}
sort.Slice(res.sorted, func(i, j int) bool {
return res.sorted[i].Key < res.sorted[j].Key
})
return res
}
// String implements the Stringer interface and provides a reproducibly
// hashable representation of a Resource.
func (r Resource) String() string {
// Ensure unique strings if key/value contains '=', ',', or '\'.
escaper := strings.NewReplacer("=", `\=`, ",", `\,`, `\`, `\\`)
var b strings.Builder
// Note: this could be further optimized by precomputing the size of
// the resulting buffer and adding a call to b.Grow
b.WriteString("Resource(")
if len(r.sorted) > 0 {
b.WriteString(escaper.Replace(string(r.sorted[0].Key)))
b.WriteRune('=')
b.WriteString(escaper.Replace(r.sorted[0].Value.Emit()))
for _, s := range r.sorted[1:] {
b.WriteRune(',')
b.WriteString(escaper.Replace(string(s.Key)))
b.WriteRune('=')
b.WriteString(escaper.Replace(s.Value.Emit()))
// String implements the Stringer interface and provides a
// human-readable form of the resource.
//
// Avoid using this representation as the key in a map of resources,
// use Equivalent() as the key instead.
func (r *Resource) String() string {
if r == nil {
return ""
}
}
b.WriteRune(')')
return b.String()
return r.labels.Encoded(label.DefaultEncoder())
}
// Attributes returns a copy of attributes from the resource in a sorted order.
func (r Resource) Attributes() []core.KeyValue {
return append([]core.KeyValue(nil), r.sorted...)
// To avoid allocating a new slice, use an iterator.
func (r *Resource) Attributes() []core.KeyValue {
if r == nil {
r = Empty()
}
return r.labels.ToSlice()
}
// Iter returns an interator of the Resource attributes.
//
// This is ideal to use if you do not want a copy of the attributes.
func (r Resource) Iter() AttributeIterator {
return NewAttributeIterator(r.sorted)
func (r *Resource) Iter() label.Iterator {
if r == nil {
r = Empty()
}
return r.labels.Iter()
}
// Equal returns true if other Resource is equal to r.
func (r Resource) Equal(other Resource) bool {
return r.String() == other.String()
// Equal returns true when a Resource is equivalent to this Resource.
func (r *Resource) Equal(eq *Resource) bool {
if r == nil {
r = Empty()
}
if eq == nil {
eq = Empty()
}
return r.Equivalent() == eq.Equivalent()
}
// Merge creates a new resource by combining resource a and b.
// If there are common key between resource a and b then value from resource a is preserved.
// If one of the resources is nil then the other resource is returned without creating a new one.
//
// If there are common keys between resource a and b, then the value
// from resource a is preserved.
func Merge(a, b *Resource) *Resource {
if a == nil {
return b
a = Empty()
}
if b == nil {
return a
b = Empty()
}
// Note: 'b' is listed first so that 'a' will overwrite with
// last-value-wins in label.New()
combine := append(b.Attributes(), a.Attributes()...)
return New(combine...)
}
// Note: the following could be optimized by implementing a dedicated merge sort.
kvs := make([]core.KeyValue, 0, len(a.sorted)+len(b.sorted))
kvs = append(kvs, a.sorted...)
// a overwrites b, so b needs to be at the end.
kvs = append(kvs, b.sorted...)
return New(kvs...)
// Empty returns an instance of Resource with no attributes. It is
// equivalent to a `nil` Resource.
func Empty() *Resource {
return &emptyResource
}
// MarshalJSON prints the resource attributes in sorted order.
func (r Resource) MarshalJSON() ([]byte, error) {
return json.Marshal(r.sorted)
// Equivalent returns an object that can be compared for equality
// between two resources. This value is suitable for use as a key in
// a map.
func (r *Resource) Equivalent() label.Distinct {
if r == nil {
r = Empty()
}
return r.labels.Equivalent()
}
// MarshalJSON encodes labels as a JSON list of { "Key": "...", "Value": ... }
// pairs in order sorted by key.
func (r *Resource) MarshalJSON() ([]byte, error) {
if r == nil {
r = Empty()
}
return r.labels.MarshalJSON()
}
// Len returns the number of unique key-values in this Resource.
func (r *Resource) Len() int {
if r == nil {
return 0
}
return r.labels.Len()
}

View File

@@ -43,12 +43,12 @@ func TestNew(t *testing.T) {
}{
{
name: "New with common key order1",
in: []core.KeyValue{kv11, kv12, kv21},
in: []core.KeyValue{kv12, kv11, kv21},
want: []core.KeyValue{kv11, kv21},
},
{
name: "New with common key order2",
in: []core.KeyValue{kv12, kv11, kv21},
in: []core.KeyValue{kv11, kv12, kv21},
want: []core.KeyValue{kv12, kv21},
},
{
@@ -157,55 +157,55 @@ func TestString(t *testing.T) {
}{
{
kvs: nil,
want: "Resource()",
want: "",
},
{
kvs: []core.KeyValue{},
want: "Resource()",
want: "",
},
{
kvs: []core.KeyValue{kv11},
want: "Resource(k1=v11)",
want: "k1=v11",
},
{
kvs: []core.KeyValue{kv11, kv12},
want: "Resource(k1=v11)",
want: "k1=v12",
},
{
kvs: []core.KeyValue{kv11, kv21},
want: "Resource(k1=v11,k2=v21)",
want: "k1=v11,k2=v21",
},
{
kvs: []core.KeyValue{kv21, kv11},
want: "Resource(k1=v11,k2=v21)",
want: "k1=v11,k2=v21",
},
{
kvs: []core.KeyValue{kv11, kv21, kv31},
want: "Resource(k1=v11,k2=v21,k3=v31)",
want: "k1=v11,k2=v21,k3=v31",
},
{
kvs: []core.KeyValue{kv31, kv11, kv21},
want: "Resource(k1=v11,k2=v21,k3=v31)",
want: "k1=v11,k2=v21,k3=v31",
},
{
kvs: []core.KeyValue{key.String("A", "a"), key.String("B", "b")},
want: "Resource(A=a,B=b)",
want: "A=a,B=b",
},
{
kvs: []core.KeyValue{key.String("A", "a,B=b")},
want: `Resource(A=a\,B\=b)`,
want: `A=a\,B\=b`,
},
{
kvs: []core.KeyValue{key.String("A", `a,B\=b`)},
want: `Resource(A=a\,B\\\=b)`,
want: `A=a\,B\\\=b`,
},
{
kvs: []core.KeyValue{key.String("A=a,B", `b`)},
want: `Resource(A\=a\,B=b)`,
want: `A\=a\,B=b`,
},
{
kvs: []core.KeyValue{key.String(`A=a\,B`, `b`)},
want: `Resource(A\=a\\\,B=b)`,
want: `A\=a\\\,B=b`,
},
} {
if got := resource.New(test.kvs...).String(); got != test.want {

View File

@@ -163,7 +163,7 @@ func (p *Provider) ApplyConfig(cfg Config) {
c.MaxLinksPerSpan = cfg.MaxLinksPerSpan
}
if cfg.Resource != nil {
c.Resource = resource.New(cfg.Resource.Attributes()...)
c.Resource = cfg.Resource
}
p.config.Store(&c)
}

View File

@@ -601,8 +601,7 @@ func TestSetSpanStatus(t *testing.T) {
func cmpDiff(x, y interface{}) string {
return cmp.Diff(x, y,
cmp.AllowUnexported(core.Value{}),
cmp.AllowUnexported(export.Event{}),
cmp.AllowUnexported(resource.Resource{}))
cmp.AllowUnexported(export.Event{}))
}
func remoteSpanContext() core.SpanContext {