diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 0cb5eee47..31691f552 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "strings" "testing" "github.com/stretchr/testify/require" @@ -30,19 +31,26 @@ import ( sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/counter" - "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" ) type correctnessBatcher struct { - t *testing.T - agg export.Aggregator + t *testing.T + records []export.Record } type testLabelEncoder struct{} -func (cb *correctnessBatcher) AggregatorFor(*export.Descriptor) export.Aggregator { - return cb.agg +func (cb *correctnessBatcher) AggregatorFor(descriptor *export.Descriptor) export.Aggregator { + name := descriptor.Name() + switch { + case strings.HasSuffix(name, ".counter"): + return counter.New() + case strings.HasSuffix(name, ".disabled"): + return nil + default: + return array.New() + } } func (cb *correctnessBatcher) CheckpointSet() export.CheckpointSet { @@ -64,10 +72,8 @@ func (testLabelEncoder) Encode(labels []core.KeyValue) string { func TestInputRangeTestCounter(t *testing.T) { ctx := context.Background() - cagg := counter.New() batcher := &correctnessBatcher{ - t: t, - agg: cagg, + t: t, } sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) @@ -76,21 +82,22 @@ func TestInputRangeTestCounter(t *testing.T) { sdkErr = handleErr }) - counter := sdk.NewInt64Counter("counter.name", metric.WithMonotonic(true)) + counter := sdk.NewInt64Counter("name.counter", metric.WithMonotonic(true)) counter.Add(ctx, -1, sdk.Labels()) require.Equal(t, aggregator.ErrNegativeInput, sdkErr) sdkErr = nil - sdk.Collect(ctx) - sum, err := cagg.Sum() + checkpointed := sdk.Collect(ctx) + sum, err := batcher.records[0].Aggregator().(aggregator.Sum).Sum() require.Equal(t, int64(0), sum.AsInt64()) + require.Equal(t, 1, checkpointed) require.Nil(t, err) + batcher.records = nil counter.Add(ctx, 1, sdk.Labels()) - checkpointed := sdk.Collect(ctx) - - sum, err = cagg.Sum() + checkpointed = sdk.Collect(ctx) + sum, err = batcher.records[0].Aggregator().(aggregator.Sum).Sum() require.Equal(t, int64(1), sum.AsInt64()) require.Equal(t, 1, checkpointed) require.Nil(t, err) @@ -99,10 +106,8 @@ func TestInputRangeTestCounter(t *testing.T) { func TestInputRangeTestMeasure(t *testing.T) { ctx := context.Background() - magg := array.New() batcher := &correctnessBatcher{ - t: t, - agg: magg, + t: t, } sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) @@ -111,22 +116,25 @@ func TestInputRangeTestMeasure(t *testing.T) { sdkErr = handleErr }) - measure := sdk.NewFloat64Measure("measure.name", metric.WithAbsolute(true)) + measure := sdk.NewFloat64Measure("name.measure", metric.WithAbsolute(true)) measure.Record(ctx, -1, sdk.Labels()) require.Equal(t, aggregator.ErrNegativeInput, sdkErr) sdkErr = nil - sdk.Collect(ctx) - count, err := magg.Count() + checkpointed := sdk.Collect(ctx) + count, err := batcher.records[0].Aggregator().(aggregator.Distribution).Count() require.Equal(t, int64(0), count) + require.Equal(t, 1, checkpointed) require.Nil(t, err) measure.Record(ctx, 1, sdk.Labels()) measure.Record(ctx, 2, sdk.Labels()) - checkpointed := sdk.Collect(ctx) - count, err = magg.Count() + batcher.records = nil + checkpointed = sdk.Collect(ctx) + + count, err = batcher.records[0].Aggregator().(aggregator.Distribution).Count() require.Equal(t, int64(2), count) require.Equal(t, 1, checkpointed) require.Nil(t, sdkErr) @@ -136,23 +144,22 @@ func TestInputRangeTestMeasure(t *testing.T) { func TestDisabledInstrument(t *testing.T) { ctx := context.Background() batcher := &correctnessBatcher{ - t: t, - agg: nil, + t: t, } sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) - measure := sdk.NewFloat64Measure("measure.name", metric.WithAbsolute(true)) + measure := sdk.NewFloat64Measure("name.disabled", metric.WithAbsolute(true)) measure.Record(ctx, -1, sdk.Labels()) checkpointed := sdk.Collect(ctx) require.Equal(t, 0, checkpointed) + require.Equal(t, 0, len(batcher.records)) } func TestRecordNaN(t *testing.T) { ctx := context.Background() batcher := &correctnessBatcher{ - t: t, - agg: lastvalue.New(), + t: t, } sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) @@ -167,12 +174,10 @@ func TestRecordNaN(t *testing.T) { require.Error(t, sdkErr) } -func TestSDKLabelEncoder(t *testing.T) { +func TestSDKAltLabelEncoder(t *testing.T) { ctx := context.Background() - cagg := counter.New() batcher := &correctnessBatcher{ - t: t, - agg: cagg, + t: t, } sdk := sdk.New(batcher, testLabelEncoder{}) @@ -187,6 +192,73 @@ func TestSDKLabelEncoder(t *testing.T) { require.Equal(t, `[{A {8 0 B}} {C {8 0 D}}]`, labels.Encoded()) } +func TestSDKLabelsDeduplication(t *testing.T) { + ctx := context.Background() + batcher := &correctnessBatcher{ + t: t, + } + sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) + + counter := sdk.NewInt64Counter("counter") + + const ( + maxKeys = 21 + keySets = 2 + repeats = 3 + ) + var keysA []core.Key + var keysB []core.Key + + for i := 0; i < maxKeys; i++ { + keysA = append(keysA, core.Key(fmt.Sprintf("A%03d", i))) + keysB = append(keysB, core.Key(fmt.Sprintf("B%03d", i))) + } + + var allExpect [][]core.KeyValue + for numKeys := 0; numKeys < maxKeys; numKeys++ { + + var kvsA []core.KeyValue + var kvsB []core.KeyValue + for r := 0; r < repeats; r++ { + for i := 0; i < numKeys; i++ { + kvsA = append(kvsA, keysA[i].Int(r)) + kvsB = append(kvsB, keysB[i].Int(r)) + } + } + + var expectA []core.KeyValue + var expectB []core.KeyValue + for i := 0; i < numKeys; i++ { + expectA = append(expectA, keysA[i].Int(repeats-1)) + expectB = append(expectB, keysB[i].Int(repeats-1)) + } + + counter.Add(ctx, 1, sdk.Labels(kvsA...)) + counter.Add(ctx, 1, sdk.Labels(kvsA...)) + allExpect = append(allExpect, expectA) + + if numKeys != 0 { + // In this case A and B sets are the same. + counter.Add(ctx, 1, sdk.Labels(kvsB...)) + counter.Add(ctx, 1, sdk.Labels(kvsB...)) + allExpect = append(allExpect, expectB) + } + + } + + sdk.Collect(ctx) + + var actual [][]core.KeyValue + for _, rec := range batcher.records { + sum, _ := rec.Aggregator().(aggregator.Sum).Sum() + require.Equal(t, sum, core.NewInt64Number(2)) + + actual = append(actual, rec.Labels().Ordered()) + } + + require.ElementsMatch(t, allExpect, actual) +} + func TestDefaultLabelEncoder(t *testing.T) { encoder := sdk.NewDefaultLabelEncoder() diff --git a/sdk/metric/list.go b/sdk/metric/list.go index 078781c99..9c274a9cc 100644 --- a/sdk/metric/list.go +++ b/sdk/metric/list.go @@ -14,6 +14,10 @@ package metric +import "go.opentelemetry.io/otel/api/core" + +type sortedLabels []core.KeyValue + func (l *sortedLabels) Len() int { return len(*l) } diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index f4e8c216b..5a85ac703 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "os" + "reflect" "runtime" "sort" "sync" @@ -72,23 +73,30 @@ type ( meter *SDK } - // sortedLabels are used to de-duplicate and canonicalize labels. - sortedLabels []core.KeyValue + // orderedLabels is a variable-size array of core.KeyValue + // suitable for use as a map key. + orderedLabels interface{} // labels implements the OpenTelemetry LabelSet API, // represents an internalized set of labels that may be used // repeatedly. labels struct { - meter *SDK - sorted sortedLabels - encoded string + meter *SDK + + // slice is a slice of `ordered`. + slice sortedLabels + + // ordered is the output of sorting and deduplicating + // the labels, copied into an array of the correct + // size for use as a map key. + ordered orderedLabels } // mapkey uniquely describes a metric instrument in terms of // its InstrumentID and the encoded form of its LabelSet. mapkey struct { descriptor *export.Descriptor - encoded string + ordered orderedLabels } // record maintains the state of one metric instrument. Due @@ -135,9 +143,9 @@ type ( observer struct { meter *SDK descriptor *export.Descriptor - // recorders maps encoded labelset to the pair of + // recorders maps ordered labels to the pair of // labelset and recorder - recorders map[string]labeledRecorder + recorders map[orderedLabels]labeledRecorder callback observerCallback } @@ -167,6 +175,8 @@ var ( _ api.Float64Observer = float64Observer{} _ api.Int64ObserverResult = int64ObserverResult{} _ api.Float64ObserverResult = float64ObserverResult{} + + kvType = reflect.TypeOf(core.KeyValue{}) ) func (r observerResult) observe(number core.Number, ls api.LabelSet) { @@ -192,20 +202,20 @@ func (o *observer) recordOne(number core.Number, ls api.LabelSet) { func (o *observer) getRecorder(ls api.LabelSet) export.Aggregator { labels := o.meter.labsFor(ls) - lrec, ok := o.recorders[labels.encoded] + lrec, ok := o.recorders[labels.ordered] if ok { lrec.modifiedEpoch = o.meter.currentEpoch - o.recorders[labels.encoded] = lrec + o.recorders[labels.ordered] = lrec return lrec.recorder } rec := o.meter.batcher.AggregatorFor(o.descriptor) if o.recorders == nil { - o.recorders = make(map[string]labeledRecorder) + o.recorders = make(map[orderedLabels]labeledRecorder) } // This may store nil recorder in the map, thus disabling the // observer for the labelset for good. This is intentional, // but will be revisited later. - o.recorders[labels.encoded] = labeledRecorder{ + o.recorders[labels.ordered] = labeledRecorder{ recorder: rec, labels: labels, modifiedEpoch: o.meter.currentEpoch, @@ -245,7 +255,7 @@ func (i *instrument) acquireHandle(ls *labels) *record { // Create lookup key for sync.Map (one allocation) mk := mapkey{ descriptor: i.descriptor, - encoded: ls.encoded, + ordered: ls.ordered, } if actual, ok := i.meter.current.Load(mk); ok { @@ -336,41 +346,109 @@ func DefaultErrorHandler(err error) { // Labels returns a LabelSet corresponding to the arguments. Passed // labels are de-duplicated, with last-value-wins semantics. func (m *SDK) Labels(kvs ...core.KeyValue) api.LabelSet { - // Note: This computes a canonical encoding of the labels to - // use as a map key. It happens to use the encoding used by - // statsd for labels, allowing an optimization for statsd - // batchers. This could be made configurable in the - // constructor, to support the same optimization for different - // batchers. - // Check for empty set. if len(kvs) == 0 { return &m.empty } - ls := &labels{ - meter: m, - sorted: kvs, + ls := &labels{ // allocation + meter: m, + slice: kvs, } - // Sort and de-duplicate. - sort.Stable(&ls.sorted) + // Sort and de-duplicate. Note: this use of `ls.slice` avoids + // an allocation by using the address-able field rather than + // `kvs`. Labels retains a copy of this slice, i.e., the + // initial allocation at the varargs call site. + // + // Note that `ls.slice` continues to refer to this memory, + // even though a new array is allocated for `ls.ordered`. It + // is possible for the `slice` to refer to the same memory, + // although in the reflection code path of `computeOrdered` it + // costs an allocation to yield a slice through + // `(reflect.Value).Interface()`. + // + // TODO: There is a possibility that the caller passes values + // without an allocation (e.g., `meter.Labels(kvs...)`), and + // that the user could later modify the slice, leading to + // incorrect results. This is indeed a risk, one that should + // be quickly addressed via the following TODO. + // + // TODO: It would be better overall if the export.Labels interface + // did not expose a slice via `Ordered()`, if instead it exposed + // getter methods like `Len()` and `Order(i int)`. Then we would + // just implement the interface using the `orderedLabels` array. + sort.Stable(&ls.slice) + oi := 1 - for i := 1; i < len(ls.sorted); i++ { - if ls.sorted[i-1].Key == ls.sorted[i].Key { - ls.sorted[oi-1] = ls.sorted[i] + for i := 1; i < len(kvs); i++ { + if kvs[i-1].Key == kvs[i].Key { + // Overwrite the value for "last-value wins". + kvs[oi-1].Value = kvs[i].Value continue } - ls.sorted[oi] = ls.sorted[i] + kvs[oi] = kvs[i] oi++ } - ls.sorted = ls.sorted[0:oi] - - ls.encoded = m.labelEncoder.Encode(ls.sorted) - + kvs = kvs[0:oi] + ls.slice = kvs + ls.computeOrdered(kvs) return ls } +func (ls *labels) computeOrdered(kvs []core.KeyValue) { + switch len(kvs) { + case 1: + ptr := new([1]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 2: + ptr := new([2]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 3: + ptr := new([3]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 4: + ptr := new([4]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 5: + ptr := new([5]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 6: + ptr := new([6]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 7: + ptr := new([7]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 8: + ptr := new([8]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 9: + ptr := new([9]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + case 10: + ptr := new([10]core.KeyValue) + copy((*ptr)[:], kvs) + ls.ordered = *ptr + default: + at := reflect.New(reflect.ArrayOf(len(kvs), kvType)).Elem() + + for i := 0; i < len(kvs); i++ { + *(at.Index(i).Addr().Interface().(*core.KeyValue)) = kvs[i] + } + + ls.ordered = at.Interface() + } +} + // labsFor sanitizes the input LabelSet. The input will be rejected // if it was created by another Meter instance, for example. func (m *SDK) labsFor(ls api.LabelSet) *labels { @@ -578,7 +656,12 @@ func (m *SDK) checkpoint(ctx context.Context, descriptor *export.Descriptor, rec return 0 } recorder.Checkpoint(ctx, descriptor) - exportLabels := export.NewLabels(labels.sorted, labels.encoded, m.labelEncoder) + + // TODO Labels are encoded once per collection interval, + // instead of once per bound instrument lifetime. This can be + // addressed similarly to OTEP 78, see + // https://github.com/jmacd/opentelemetry-go/blob/8bed2e14df7f9f4688fbab141924bb786dc9a3a1/api/context/internal/set.go#L89 + exportLabels := export.NewLabels(labels.slice, m.labelEncoder.Encode(labels.slice), m.labelEncoder) exportRecord := export.NewRecord(descriptor, exportLabels, recorder) err := m.batcher.Process(ctx, exportRecord) if err != nil { @@ -629,6 +712,6 @@ func (r *record) Unbind() { func (r *record) mapkey() mapkey { return mapkey{ descriptor: r.descriptor, - encoded: r.labels.encoded, + ordered: r.labels.ordered, } }