You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-09-16 09:26:25 +02:00
Remove WithKeys() option, defaultkeys batcher (#639)
This commit is contained in:
@@ -38,9 +38,6 @@ type Config struct {
|
||||
Description string
|
||||
// Unit is an optional field describing the metric instrument.
|
||||
Unit unit.Unit
|
||||
// Keys are recommended keys determined in the handles
|
||||
// obtained for the metric.
|
||||
Keys []core.Key
|
||||
// Resource describes the entity for which measurements are made.
|
||||
Resource resource.Resource
|
||||
// LibraryName is the name given to the Meter that created
|
||||
@@ -117,13 +114,6 @@ func (d Descriptor) MetricKind() Kind {
|
||||
return d.kind
|
||||
}
|
||||
|
||||
// Keys returns the recommended keys included in the metric
|
||||
// definition. These keys may be used by a Batcher as a default set
|
||||
// of grouping keys for the metric instrument.
|
||||
func (d Descriptor) Keys() []core.Key {
|
||||
return d.config.Keys
|
||||
}
|
||||
|
||||
// Description provides a human-readable description of the metric
|
||||
// instrument.
|
||||
func (d Descriptor) Description() string {
|
||||
@@ -210,18 +200,6 @@ func (u unitOption) Apply(config *Config) {
|
||||
config.Unit = unit.Unit(u)
|
||||
}
|
||||
|
||||
// WithKeys applies recommended label keys. Multiple `WithKeys`
|
||||
// options accumulate.
|
||||
func WithKeys(keys ...core.Key) Option {
|
||||
return keysOption(keys)
|
||||
}
|
||||
|
||||
type keysOption []core.Key
|
||||
|
||||
func (k keysOption) Apply(config *Config) {
|
||||
config.Keys = append(config.Keys, k...)
|
||||
}
|
||||
|
||||
// WithResource applies provided Resource.
|
||||
//
|
||||
// This will override any existing Resource.
|
||||
|
@@ -38,7 +38,6 @@ func TestOptions(t *testing.T) {
|
||||
type testcase struct {
|
||||
name string
|
||||
opts []metric.Option
|
||||
keys []core.Key
|
||||
desc string
|
||||
unit unit.Unit
|
||||
resource resource.Resource
|
||||
@@ -47,23 +46,6 @@ func TestOptions(t *testing.T) {
|
||||
{
|
||||
name: "no opts",
|
||||
opts: nil,
|
||||
keys: nil,
|
||||
desc: "",
|
||||
unit: "",
|
||||
resource: resource.Resource{},
|
||||
},
|
||||
{
|
||||
name: "keys keys keys",
|
||||
opts: []metric.Option{
|
||||
metric.WithKeys(key.New("foo"), key.New("foo2")),
|
||||
metric.WithKeys(key.New("bar"), key.New("bar2")),
|
||||
metric.WithKeys(key.New("baz"), key.New("baz2")),
|
||||
},
|
||||
keys: []core.Key{
|
||||
key.New("foo"), key.New("foo2"),
|
||||
key.New("bar"), key.New("bar2"),
|
||||
key.New("baz"), key.New("baz2"),
|
||||
},
|
||||
desc: "",
|
||||
unit: "",
|
||||
resource: resource.Resource{},
|
||||
@@ -73,7 +55,6 @@ func TestOptions(t *testing.T) {
|
||||
opts: []metric.Option{
|
||||
metric.WithDescription("stuff"),
|
||||
},
|
||||
keys: nil,
|
||||
desc: "stuff",
|
||||
unit: "",
|
||||
resource: resource.Resource{},
|
||||
@@ -84,7 +65,6 @@ func TestOptions(t *testing.T) {
|
||||
metric.WithDescription("stuff"),
|
||||
metric.WithDescription("things"),
|
||||
},
|
||||
keys: nil,
|
||||
desc: "things",
|
||||
unit: "",
|
||||
resource: resource.Resource{},
|
||||
@@ -94,7 +74,6 @@ func TestOptions(t *testing.T) {
|
||||
opts: []metric.Option{
|
||||
metric.WithUnit("s"),
|
||||
},
|
||||
keys: nil,
|
||||
desc: "",
|
||||
unit: "s",
|
||||
resource: resource.Resource{},
|
||||
@@ -105,7 +84,6 @@ func TestOptions(t *testing.T) {
|
||||
metric.WithUnit("s"),
|
||||
metric.WithUnit("h"),
|
||||
},
|
||||
keys: nil,
|
||||
desc: "",
|
||||
unit: "h",
|
||||
resource: resource.Resource{},
|
||||
@@ -115,7 +93,6 @@ func TestOptions(t *testing.T) {
|
||||
opts: []metric.Option{
|
||||
metric.WithResource(*resource.New(key.New("name").String("test-name"))),
|
||||
},
|
||||
keys: nil,
|
||||
desc: "",
|
||||
unit: "",
|
||||
resource: *resource.New(key.New("name").String("test-name")),
|
||||
@@ -126,7 +103,6 @@ func TestOptions(t *testing.T) {
|
||||
if diff := cmp.Diff(metric.Configure(tt.opts), metric.Config{
|
||||
Description: tt.desc,
|
||||
Unit: tt.unit,
|
||||
Keys: tt.keys,
|
||||
Resource: tt.resource,
|
||||
}); diff != "" {
|
||||
t.Errorf("Compare options: -got +want %s", diff)
|
||||
|
@@ -78,7 +78,6 @@ func main() {
|
||||
result.Observe(1, commonLabels...)
|
||||
}
|
||||
_ = metric.Must(meter).RegisterFloat64Observer("ex.com.one", oneMetricCB,
|
||||
metric.WithKeys(fooKey, barKey, lemonsKey),
|
||||
metric.WithDescription("An observer set to 1.0"),
|
||||
)
|
||||
|
||||
|
@@ -30,8 +30,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
fooKey = key.New("ex.com/foo")
|
||||
barKey = key.New("ex.com/bar")
|
||||
lemonsKey = key.New("ex.com/lemons")
|
||||
)
|
||||
|
||||
@@ -63,11 +61,10 @@ func main() {
|
||||
result.Observe(value, labels...)
|
||||
}
|
||||
_ = metric.Must(meter).RegisterFloat64Observer("ex.com.one", cb,
|
||||
metric.WithKeys(fooKey, barKey, lemonsKey),
|
||||
metric.WithDescription("A measure set to 1.0"),
|
||||
)
|
||||
|
||||
measureTwo := metric.Must(meter).NewFloat64Measure("ex.com.two", metric.WithKeys(key.New("A")))
|
||||
measureTwo := metric.Must(meter).NewFloat64Measure("ex.com.two")
|
||||
measureThree := metric.Must(meter).NewFloat64Counter("ex.com.three")
|
||||
|
||||
commonLabels := []core.KeyValue{lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3")}
|
||||
|
@@ -27,7 +27,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/global"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys"
|
||||
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
)
|
||||
@@ -158,7 +158,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
|
||||
// it could try again on the next scrape and no data would be lost, only resolution.
|
||||
//
|
||||
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
|
||||
batcher := defaultkeys.New(selector, export.NewDefaultLabelEncoder(), true)
|
||||
batcher := ungrouped.New(selector, export.NewDefaultLabelEncoder(), true)
|
||||
pusher := push.New(batcher, exporter, period)
|
||||
pusher.Start()
|
||||
|
||||
|
@@ -42,7 +42,7 @@ func ExampleNewExportPipeline() {
|
||||
meter := pusher.Meter("example")
|
||||
|
||||
// Create and update a single counter:
|
||||
counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key))
|
||||
counter := metric.Must(meter).NewInt64Counter("a.counter")
|
||||
labels := []core.KeyValue{key.String("value")}
|
||||
|
||||
counter.Add(ctx, 100, labels...)
|
||||
|
@@ -23,7 +23,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/global"
|
||||
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
@@ -214,33 +213,19 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
|
||||
}
|
||||
}
|
||||
|
||||
specifiedKeyMap := make(map[core.Key]core.Value)
|
||||
var encodedLabels string
|
||||
iter := record.Labels().Iter()
|
||||
for iter.Next() {
|
||||
kv := iter.Label()
|
||||
specifiedKeyMap[kv.Key] = kv.Value
|
||||
}
|
||||
|
||||
var materializedKeys []string
|
||||
|
||||
if iter.Len() > 0 {
|
||||
encoded := record.Labels().Encoded(e.config.LabelEncoder)
|
||||
materializedKeys = append(materializedKeys, encoded)
|
||||
}
|
||||
|
||||
for _, k := range desc.Keys() {
|
||||
if _, ok := specifiedKeyMap[k]; !ok {
|
||||
materializedKeys = append(materializedKeys, string(k))
|
||||
}
|
||||
encodedLabels = record.Labels().Encoded(e.config.LabelEncoder)
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
|
||||
sb.WriteString(desc.Name())
|
||||
|
||||
if len(materializedKeys) > 0 {
|
||||
if len(encodedLabels) > 0 {
|
||||
sb.WriteRune('{')
|
||||
sb.WriteString(strings.Join(materializedKeys, ","))
|
||||
sb.WriteString(encodedLabels)
|
||||
sb.WriteRune('}')
|
||||
}
|
||||
|
||||
|
@@ -276,22 +276,3 @@ func TestStdoutLastValueNotSet(t *testing.T) {
|
||||
|
||||
require.Equal(t, `{"updates":null}`, fix.Output())
|
||||
}
|
||||
|
||||
func TestStdoutCounterWithUnspecifiedKeys(t *testing.T) {
|
||||
fix := newFixture(t, stdout.Config{})
|
||||
|
||||
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
|
||||
|
||||
keys := []core.Key{key.New("C"), key.New("D")}
|
||||
|
||||
desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind, metric.WithKeys(keys...))
|
||||
cagg := sum.New()
|
||||
aggtest.CheckedUpdate(fix.t, cagg, core.NewInt64Number(10), &desc)
|
||||
cagg.Checkpoint(fix.ctx, &desc)
|
||||
|
||||
checkpointSet.Add(&desc, cagg, key.String("A", "B"))
|
||||
|
||||
fix.Export(checkpointSet)
|
||||
|
||||
require.Equal(t, `{"updates":[{"name":"test.name{A=B,C,D}","sum":10}]}`, fix.Output())
|
||||
}
|
||||
|
@@ -102,7 +102,6 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
metricKind metric.Kind
|
||||
keys []core.Key
|
||||
description string
|
||||
unit unit.Unit
|
||||
numberKind core.NumberKind
|
||||
@@ -112,7 +111,6 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
|
||||
{
|
||||
"mmsc-test-a",
|
||||
metric.MeasureKind,
|
||||
[]core.Key{},
|
||||
"test-a-description",
|
||||
unit.Dimensionless,
|
||||
core.Int64NumberKind,
|
||||
@@ -128,7 +126,6 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
|
||||
{
|
||||
"mmsc-test-b",
|
||||
metric.CounterKind, // This shouldn't change anything.
|
||||
[]core.Key{"test"}, // This shouldn't change anything.
|
||||
"test-b-description",
|
||||
unit.Bytes,
|
||||
core.Float64NumberKind, // This shouldn't change anything.
|
||||
@@ -151,7 +148,6 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
|
||||
mmsc.Checkpoint(ctx, &metric.Descriptor{})
|
||||
for _, test := range tests {
|
||||
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
|
||||
metric.WithKeys(test.keys...),
|
||||
metric.WithDescription(test.description),
|
||||
metric.WithUnit(test.unit))
|
||||
labels := export.NewSimpleLabels(export.NoopLabelEncoder{}, test.labels...)
|
||||
@@ -208,7 +204,6 @@ func TestSumMetricDescriptor(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
metricKind metric.Kind
|
||||
keys []core.Key
|
||||
description string
|
||||
unit unit.Unit
|
||||
numberKind core.NumberKind
|
||||
@@ -218,7 +213,6 @@ func TestSumMetricDescriptor(t *testing.T) {
|
||||
{
|
||||
"sum-test-a",
|
||||
metric.CounterKind,
|
||||
[]core.Key{},
|
||||
"test-a-description",
|
||||
unit.Dimensionless,
|
||||
core.Int64NumberKind,
|
||||
@@ -234,7 +228,6 @@ func TestSumMetricDescriptor(t *testing.T) {
|
||||
{
|
||||
"sum-test-b",
|
||||
metric.MeasureKind, // This shouldn't change anything.
|
||||
[]core.Key{"test"}, // This shouldn't change anything.
|
||||
"test-b-description",
|
||||
unit.Milliseconds,
|
||||
core.Float64NumberKind,
|
||||
@@ -251,7 +244,6 @@ func TestSumMetricDescriptor(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
|
||||
metric.WithKeys(test.keys...),
|
||||
metric.WithDescription(test.description),
|
||||
metric.WithUnit(test.unit),
|
||||
)
|
||||
|
@@ -1,165 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package defaultkeys // import "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
)
|
||||
|
||||
type (
|
||||
Batcher struct {
|
||||
selector export.AggregationSelector
|
||||
labelEncoder export.LabelEncoder
|
||||
stateful bool
|
||||
descKeyIndex descKeyIndexMap
|
||||
aggCheckpoint aggCheckpointMap
|
||||
}
|
||||
|
||||
// descKeyIndexMap is a mapping, for each Descriptor, from the
|
||||
// Key to the position in the descriptor's recommended keys.
|
||||
descKeyIndexMap map[*metric.Descriptor]map[core.Key]int
|
||||
|
||||
// batchKey describes a unique metric descriptor and encoded label set.
|
||||
batchKey struct {
|
||||
descriptor *metric.Descriptor
|
||||
encoded string
|
||||
}
|
||||
|
||||
// aggCheckpointMap is a mapping from batchKey to current
|
||||
// export record. If the batcher is stateful, this map is
|
||||
// never cleared.
|
||||
aggCheckpointMap map[batchKey]export.Record
|
||||
|
||||
checkpointSet struct {
|
||||
aggCheckpointMap aggCheckpointMap
|
||||
labelEncoder export.LabelEncoder
|
||||
}
|
||||
)
|
||||
|
||||
var _ export.Batcher = &Batcher{}
|
||||
var _ export.CheckpointSet = &checkpointSet{}
|
||||
|
||||
func New(selector export.AggregationSelector, labelEncoder export.LabelEncoder, stateful bool) *Batcher {
|
||||
return &Batcher{
|
||||
selector: selector,
|
||||
labelEncoder: labelEncoder,
|
||||
descKeyIndex: descKeyIndexMap{},
|
||||
aggCheckpoint: aggCheckpointMap{},
|
||||
stateful: stateful,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Batcher) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
|
||||
return b.selector.AggregatorFor(descriptor)
|
||||
}
|
||||
|
||||
func (b *Batcher) Process(_ context.Context, record export.Record) error {
|
||||
desc := record.Descriptor()
|
||||
keys := desc.Keys()
|
||||
|
||||
// Cache the mapping from Descriptor->Key->Index
|
||||
ki, ok := b.descKeyIndex[desc]
|
||||
if !ok {
|
||||
ki = map[core.Key]int{}
|
||||
b.descKeyIndex[desc] = ki
|
||||
|
||||
for i, k := range keys {
|
||||
ki[k] = i
|
||||
}
|
||||
}
|
||||
|
||||
// Compute the value list. Note: Unspecified values become
|
||||
// empty strings. TODO: pin this down, we have no appropriate
|
||||
// Value constructor.
|
||||
outputLabels := make([]core.KeyValue, len(keys))
|
||||
|
||||
for i, key := range keys {
|
||||
outputLabels[i] = key.String("")
|
||||
}
|
||||
|
||||
// Note also the possibility to speed this computation of
|
||||
// "encoded" via "outputLabels" in the form of a (Descriptor,
|
||||
// Labels)->(Labels, Encoded) cache.
|
||||
iter := record.Labels().Iter()
|
||||
for iter.Next() {
|
||||
kv := iter.Label()
|
||||
pos, ok := ki[kv.Key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
outputLabels[pos].Value = kv.Value
|
||||
}
|
||||
|
||||
// Compute an encoded lookup key.
|
||||
elabels := export.NewSimpleLabels(b.labelEncoder, outputLabels...)
|
||||
encoded := elabels.Encoded(b.labelEncoder)
|
||||
|
||||
// Merge this aggregator with all preceding aggregators that
|
||||
// map to the same set of `outputLabels` labels.
|
||||
agg := record.Aggregator()
|
||||
key := batchKey{
|
||||
descriptor: record.Descriptor(),
|
||||
encoded: encoded,
|
||||
}
|
||||
rag, ok := b.aggCheckpoint[key]
|
||||
if ok {
|
||||
// Combine the input aggregator with the current
|
||||
// checkpoint state.
|
||||
return rag.Aggregator().Merge(agg, desc)
|
||||
}
|
||||
// If this Batcher is stateful, create a copy of the
|
||||
// Aggregator for long-term storage. Otherwise the
|
||||
// Meter implementation will checkpoint the aggregator
|
||||
// again, overwriting the long-lived state.
|
||||
if b.stateful {
|
||||
tmp := agg
|
||||
// Note: the call to AggregatorFor() followed by Merge
|
||||
// is effectively a Clone() operation.
|
||||
agg = b.AggregatorFor(desc)
|
||||
if err := agg.Merge(tmp, desc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
b.aggCheckpoint[key] = export.NewRecord(desc, elabels, agg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Batcher) CheckpointSet() export.CheckpointSet {
|
||||
return &checkpointSet{
|
||||
aggCheckpointMap: b.aggCheckpoint,
|
||||
labelEncoder: b.labelEncoder,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Batcher) FinishedCollection() {
|
||||
if !b.stateful {
|
||||
b.aggCheckpoint = aggCheckpointMap{}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *checkpointSet) ForEach(f func(export.Record) error) error {
|
||||
for _, entry := range p.aggCheckpointMap {
|
||||
if err := f(entry); err != nil && !errors.Is(err, aggregator.ErrNoData) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -1,145 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package defaultkeys_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys"
|
||||
"go.opentelemetry.io/otel/sdk/metric/batcher/test"
|
||||
)
|
||||
|
||||
func TestGroupingStateless(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
b := defaultkeys.New(test.NewAggregationSelector(), test.GroupEncoder, false)
|
||||
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels2, 20))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels3, 30))
|
||||
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 10))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels2, 20))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels3, 30))
|
||||
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels2, 20))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels3, 40))
|
||||
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels2, 20))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels3, 40))
|
||||
|
||||
checkpointSet := b.CheckpointSet()
|
||||
b.FinishedCollection()
|
||||
|
||||
records := test.NewOutput(test.GroupEncoder)
|
||||
err := checkpointSet.ForEach(records.AddTo)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Repeat for {counter,lastvalue}.{1,2}.
|
||||
// Output lastvalue should have only the "G=H" and "G=" keys.
|
||||
// Output counter should have only the "C=D" and "C=" keys.
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"sum.a/C=D": 30, // labels1 + labels2
|
||||
"sum.a/C=": 40, // labels3
|
||||
"sum.b/C=D": 30, // labels1 + labels2
|
||||
"sum.b/C=": 40, // labels3
|
||||
"lastvalue.a/G=H": 10, // labels1
|
||||
"lastvalue.a/G=": 30, // labels3 = last value
|
||||
"lastvalue.b/G=H": 10, // labels1
|
||||
"lastvalue.b/G=": 30, // labels3 = last value
|
||||
}, records.Map)
|
||||
|
||||
// Verify that state is reset by FinishedCollection()
|
||||
checkpointSet = b.CheckpointSet()
|
||||
b.FinishedCollection()
|
||||
_ = checkpointSet.ForEach(func(rec export.Record) error {
|
||||
t.Fatal("Unexpected call")
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestGroupingStateful(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
b := defaultkeys.New(test.NewAggregationSelector(), test.GroupEncoder, true)
|
||||
|
||||
counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)
|
||||
caggA := counterA.Aggregator()
|
||||
_ = b.Process(ctx, counterA)
|
||||
|
||||
counterB := test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10)
|
||||
caggB := counterB.Aggregator()
|
||||
_ = b.Process(ctx, counterB)
|
||||
|
||||
checkpointSet := b.CheckpointSet()
|
||||
b.FinishedCollection()
|
||||
|
||||
records1 := test.NewOutput(test.GroupEncoder)
|
||||
err := checkpointSet.ForEach(records1.AddTo)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"sum.a/C=D": 10, // labels1
|
||||
"sum.b/C=D": 10, // labels1
|
||||
}, records1.Map)
|
||||
|
||||
// Test that state was NOT reset
|
||||
checkpointSet = b.CheckpointSet()
|
||||
b.FinishedCollection()
|
||||
|
||||
records2 := test.NewOutput(test.GroupEncoder)
|
||||
err = checkpointSet.ForEach(records2.AddTo)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, records1.Map, records2.Map)
|
||||
|
||||
// Update and re-checkpoint the original record.
|
||||
_ = caggA.Update(ctx, core.NewInt64Number(20), &test.CounterADesc)
|
||||
_ = caggB.Update(ctx, core.NewInt64Number(20), &test.CounterBDesc)
|
||||
caggA.Checkpoint(ctx, &test.CounterADesc)
|
||||
caggB.Checkpoint(ctx, &test.CounterBDesc)
|
||||
|
||||
// As yet cagg has not been passed to Batcher.Process. Should
|
||||
// not see an update.
|
||||
checkpointSet = b.CheckpointSet()
|
||||
b.FinishedCollection()
|
||||
|
||||
records3 := test.NewOutput(test.GroupEncoder)
|
||||
err = checkpointSet.ForEach(records3.AddTo)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, records1.Map, records3.Map)
|
||||
|
||||
// Now process the second update
|
||||
_ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, caggA))
|
||||
_ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, caggB))
|
||||
|
||||
checkpointSet = b.CheckpointSet()
|
||||
b.FinishedCollection()
|
||||
|
||||
records4 := test.NewOutput(test.GroupEncoder)
|
||||
err = checkpointSet.ForEach(records4.AddTo)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"sum.a/C=D": 30,
|
||||
"sum.b/C=D": 30,
|
||||
}, records4.Map)
|
||||
}
|
@@ -47,14 +47,14 @@ type (
|
||||
var (
|
||||
// LastValueADesc and LastValueBDesc group by "G"
|
||||
LastValueADesc = metric.NewDescriptor(
|
||||
"lastvalue.a", metric.ObserverKind, core.Int64NumberKind, metric.WithKeys(key.New("G")))
|
||||
"lastvalue.a", metric.ObserverKind, core.Int64NumberKind)
|
||||
LastValueBDesc = metric.NewDescriptor(
|
||||
"lastvalue.b", metric.ObserverKind, core.Int64NumberKind, metric.WithKeys(key.New("G")))
|
||||
"lastvalue.b", metric.ObserverKind, core.Int64NumberKind)
|
||||
// CounterADesc and CounterBDesc group by "C"
|
||||
CounterADesc = metric.NewDescriptor(
|
||||
"sum.a", metric.CounterKind, core.Int64NumberKind, metric.WithKeys(key.New("C")))
|
||||
"sum.a", metric.CounterKind, core.Int64NumberKind)
|
||||
CounterBDesc = metric.NewDescriptor(
|
||||
"sum.b", metric.CounterKind, core.Int64NumberKind, metric.WithKeys(key.New("C")))
|
||||
"sum.b", metric.CounterKind, core.Int64NumberKind)
|
||||
|
||||
// SdkEncoder uses a non-standard encoder like K1~V1&K2~V2
|
||||
SdkEncoder = &Encoder{}
|
||||
|
@@ -39,7 +39,7 @@ func ExampleNew() {
|
||||
key := key.New("key")
|
||||
meter := pusher.Meter("example")
|
||||
|
||||
counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key))
|
||||
counter := metric.Must(meter).NewInt64Counter("a.counter")
|
||||
|
||||
counter.Add(ctx, 100, key.String("value"))
|
||||
|
||||
|
Reference in New Issue
Block a user