diff --git a/sdk/metric/batcher/defaultkeys/defaultkeys.go b/sdk/metric/batcher/defaultkeys/defaultkeys.go index a07c616bf..56da38a65 100644 --- a/sdk/metric/batcher/defaultkeys/defaultkeys.go +++ b/sdk/metric/batcher/defaultkeys/defaultkeys.go @@ -34,10 +34,16 @@ type ( // Key to the position in the descriptor's recommended keys. descKeyIndexMap map[*export.Descriptor]map[core.Key]int - // aggCheckpointMap is a mapping from encoded label set to current + // batchKey describes a unique metric descriptor and encoded label set. + batchKey struct { + descriptor *export.Descriptor + encoded string + } + + // aggCheckpointMap is a mapping from batchKey to current // export record. If the batcher is stateful, this map is // never cleared. - aggCheckpointMap map[string]export.Record + aggCheckpointMap map[batchKey]export.Record checkpointSet struct { aggCheckpointMap aggCheckpointMap @@ -103,7 +109,11 @@ func (b *Batcher) Process(_ context.Context, record export.Record) error { // Merge this aggregator with all preceding aggregators that // map to the same set of `outputLabels` labels. agg := record.Aggregator() - rag, ok := b.aggCheckpoint[encoded] + key := batchKey{ + descriptor: record.Descriptor(), + encoded: encoded, + } + rag, ok := b.aggCheckpoint[key] if ok { return rag.Aggregator().Merge(agg, desc) } @@ -118,7 +128,7 @@ func (b *Batcher) Process(_ context.Context, record export.Record) error { return err } } - b.aggCheckpoint[encoded] = export.NewRecord( + b.aggCheckpoint[key] = export.NewRecord( desc, export.NewLabels(outputLabels, encoded, b.labelEncoder), agg, diff --git a/sdk/metric/batcher/defaultkeys/defaultkeys_test.go b/sdk/metric/batcher/defaultkeys/defaultkeys_test.go index 67e03d807..9b886c763 100644 --- a/sdk/metric/batcher/defaultkeys/defaultkeys_test.go +++ b/sdk/metric/batcher/defaultkeys/defaultkeys_test.go @@ -30,13 +30,21 @@ func TestGroupingStateless(t *testing.T) { ctx := context.Background() b := defaultkeys.New(test.NewAggregationSelector(), test.GroupEncoder, false) - _ = b.Process(ctx, export.NewRecord(test.GaugeDesc, test.Labels1, test.GaugeAgg(10))) - _ = b.Process(ctx, export.NewRecord(test.GaugeDesc, test.Labels2, test.GaugeAgg(20))) - _ = b.Process(ctx, export.NewRecord(test.GaugeDesc, test.Labels3, test.GaugeAgg(30))) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeADesc, test.Labels1, 10)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeADesc, test.Labels2, 20)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeADesc, test.Labels3, 30)) - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels1, test.CounterAgg(10))) - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels2, test.CounterAgg(20))) - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels3, test.CounterAgg(40))) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeBDesc, test.Labels1, 10)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeBDesc, test.Labels2, 20)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeBDesc, test.Labels3, 30)) + + _ = b.Process(ctx, test.NewCounterRecord(test.CounterADesc, test.Labels1, 10)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterADesc, test.Labels2, 20)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterADesc, test.Labels3, 40)) + + _ = b.Process(ctx, test.NewCounterRecord(test.CounterBDesc, test.Labels1, 10)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterBDesc, test.Labels2, 20)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterBDesc, test.Labels3, 40)) checkpointSet := b.CheckpointSet() b.FinishedCollection() @@ -44,13 +52,18 @@ func TestGroupingStateless(t *testing.T) { records := test.Output{} checkpointSet.ForEach(records.AddTo) + // Repeat for {counter,gauge}.{1,2}. // Output gauge should have only the "G=H" and "G=" keys. // Output counter should have only the "C=D" and "C=" keys. require.EqualValues(t, map[string]int64{ - "counter/C=D": 30, // labels1 + labels2 - "counter/C=": 40, // labels3 - "gauge/G=H": 10, // labels1 - "gauge/G=": 30, // labels3 = last value + "counter.a/C=D": 30, // labels1 + labels2 + "counter.a/C=": 40, // labels3 + "counter.b/C=D": 30, // labels1 + labels2 + "counter.b/C=": 40, // labels3 + "gauge.a/G=H": 10, // labels1 + "gauge.a/G=": 30, // labels3 = last value + "gauge.b/G=H": 10, // labels1 + "gauge.b/G=": 30, // labels3 = last value }, records) // Verify that state is reset by FinishedCollection() @@ -65,8 +78,13 @@ func TestGroupingStateful(t *testing.T) { ctx := context.Background() b := defaultkeys.New(test.NewAggregationSelector(), test.GroupEncoder, true) - cagg := test.CounterAgg(10) - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels1, cagg)) + counterA := test.NewCounterRecord(test.CounterADesc, test.Labels1, 10) + caggA := counterA.Aggregator() + _ = b.Process(ctx, counterA) + + counterB := test.NewCounterRecord(test.CounterBDesc, test.Labels1, 10) + caggB := counterB.Aggregator() + _ = b.Process(ctx, counterB) checkpointSet := b.CheckpointSet() b.FinishedCollection() @@ -75,7 +93,8 @@ func TestGroupingStateful(t *testing.T) { checkpointSet.ForEach(records1.AddTo) require.EqualValues(t, map[string]int64{ - "counter/C=D": 10, // labels1 + "counter.a/C=D": 10, // labels1 + "counter.b/C=D": 10, // labels1 }, records1) // Test that state was NOT reset @@ -88,8 +107,10 @@ func TestGroupingStateful(t *testing.T) { require.EqualValues(t, records1, records2) // Update and re-checkpoint the original record. - _ = cagg.Update(ctx, core.NewInt64Number(20), test.CounterDesc) - cagg.Checkpoint(ctx, test.CounterDesc) + _ = caggA.Update(ctx, core.NewInt64Number(20), test.CounterADesc) + _ = caggB.Update(ctx, core.NewInt64Number(20), test.CounterBDesc) + caggA.Checkpoint(ctx, test.CounterADesc) + caggB.Checkpoint(ctx, test.CounterBDesc) // As yet cagg has not been passed to Batcher.Process. Should // not see an update. @@ -102,7 +123,8 @@ func TestGroupingStateful(t *testing.T) { require.EqualValues(t, records1, records3) // Now process the second update - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels1, cagg)) + _ = b.Process(ctx, export.NewRecord(test.CounterADesc, test.Labels1, caggA)) + _ = b.Process(ctx, export.NewRecord(test.CounterBDesc, test.Labels1, caggB)) checkpointSet = b.CheckpointSet() b.FinishedCollection() @@ -111,6 +133,7 @@ func TestGroupingStateful(t *testing.T) { checkpointSet.ForEach(records4.AddTo) require.EqualValues(t, map[string]int64{ - "counter/C=D": 30, + "counter.a/C=D": 30, + "counter.b/C=D": 30, }, records4) } diff --git a/sdk/metric/batcher/test/test.go b/sdk/metric/batcher/test/test.go index 6f56949c8..69941e0e1 100644 --- a/sdk/metric/batcher/test/test.go +++ b/sdk/metric/batcher/test/test.go @@ -41,12 +41,16 @@ type ( ) var ( - // GaugeDesc groups by "G" - GaugeDesc = export.NewDescriptor( - "gauge", export.GaugeKind, []core.Key{key.New("G")}, "", "", core.Int64NumberKind, false) - // CounterDesc groups by "C" - CounterDesc = export.NewDescriptor( - "counter", export.CounterKind, []core.Key{key.New("C")}, "", "", core.Int64NumberKind, false) + // GaugeADesc and GaugeBDesc group by "G" + GaugeADesc = export.NewDescriptor( + "gauge.a", export.GaugeKind, []core.Key{key.New("G")}, "", "", core.Int64NumberKind, false) + GaugeBDesc = export.NewDescriptor( + "gauge.b", export.GaugeKind, []core.Key{key.New("G")}, "", "", core.Int64NumberKind, false) + // CounterADesc and CounterBDesc group by "C" + CounterADesc = export.NewDescriptor( + "counter.a", export.CounterKind, []core.Key{key.New("C")}, "", "", core.Int64NumberKind, false) + CounterBDesc = export.NewDescriptor( + "counter.b", export.CounterKind, []core.Key{key.New("C")}, "", "", core.Int64NumberKind, false) // SdkEncoder uses a non-standard encoder like K1~V1&K2~V2 SdkEncoder = &Encoder{} @@ -100,21 +104,31 @@ func (Encoder) Encode(labels []core.KeyValue) string { return sb.String() } -// GaugeAgg returns a checkpointed gauge aggregator w/ the specified value. -func GaugeAgg(v int64) export.Aggregator { +// GaugeAgg returns a checkpointed gauge aggregator w/ the specified descriptor and value. +func GaugeAgg(desc *export.Descriptor, v int64) export.Aggregator { ctx := context.Background() gagg := gauge.New() - _ = gagg.Update(ctx, core.NewInt64Number(v), GaugeDesc) - gagg.Checkpoint(ctx, CounterDesc) + _ = gagg.Update(ctx, core.NewInt64Number(v), desc) + gagg.Checkpoint(ctx, desc) return gagg } -// CounterAgg returns a checkpointed counter aggregator w/ the specified value. -func CounterAgg(v int64) export.Aggregator { +// Convenience method for building a test exported gauge record. +func NewGaugeRecord(desc *export.Descriptor, labels export.Labels, value int64) export.Record { + return export.NewRecord(desc, labels, GaugeAgg(desc, value)) +} + +// Convenience method for building a test exported counter record. +func NewCounterRecord(desc *export.Descriptor, labels export.Labels, value int64) export.Record { + return export.NewRecord(desc, labels, CounterAgg(desc, value)) +} + +// CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value. +func CounterAgg(desc *export.Descriptor, v int64) export.Aggregator { ctx := context.Background() cagg := counter.New() - _ = cagg.Update(ctx, core.NewInt64Number(v), CounterDesc) - cagg.Checkpoint(ctx, CounterDesc) + _ = cagg.Update(ctx, core.NewInt64Number(v), desc) + cagg.Checkpoint(ctx, desc) return cagg } diff --git a/sdk/metric/batcher/ungrouped/ungrouped_test.go b/sdk/metric/batcher/ungrouped/ungrouped_test.go index b1c556943..461da7597 100644 --- a/sdk/metric/batcher/ungrouped/ungrouped_test.go +++ b/sdk/metric/batcher/ungrouped/ungrouped_test.go @@ -33,20 +33,30 @@ func TestUngroupedStateless(t *testing.T) { b := ungrouped.New(test.NewAggregationSelector(), false) // Set initial gauge values - _ = b.Process(ctx, export.NewRecord(test.GaugeDesc, test.Labels1, test.GaugeAgg(10))) - _ = b.Process(ctx, export.NewRecord(test.GaugeDesc, test.Labels2, test.GaugeAgg(20))) - _ = b.Process(ctx, export.NewRecord(test.GaugeDesc, test.Labels3, test.GaugeAgg(30))) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeADesc, test.Labels1, 10)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeADesc, test.Labels2, 20)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeADesc, test.Labels3, 30)) + + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeBDesc, test.Labels1, 10)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeBDesc, test.Labels2, 20)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeBDesc, test.Labels3, 30)) // Another gauge Set for Labels1 - _ = b.Process(ctx, export.NewRecord(test.GaugeDesc, test.Labels1, test.GaugeAgg(50))) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeADesc, test.Labels1, 50)) + _ = b.Process(ctx, test.NewGaugeRecord(test.GaugeBDesc, test.Labels1, 50)) // Set initial counter values - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels1, test.CounterAgg(10))) - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels2, test.CounterAgg(20))) - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels3, test.CounterAgg(40))) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterADesc, test.Labels1, 10)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterADesc, test.Labels2, 20)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterADesc, test.Labels3, 40)) + + _ = b.Process(ctx, test.NewCounterRecord(test.CounterBDesc, test.Labels1, 10)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterBDesc, test.Labels2, 20)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterBDesc, test.Labels3, 40)) // Another counter Add for Labels1 - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels1, test.CounterAgg(50))) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterADesc, test.Labels1, 50)) + _ = b.Process(ctx, test.NewCounterRecord(test.CounterBDesc, test.Labels1, 50)) checkpointSet := b.CheckpointSet() b.FinishedCollection() @@ -57,12 +67,18 @@ func TestUngroupedStateless(t *testing.T) { // Output gauge should have only the "G=H" and "G=" keys. // Output counter should have only the "C=D" and "C=" keys. require.EqualValues(t, map[string]int64{ - "counter/G~H&C~D": 60, // labels1 - "counter/C~D&E~F": 20, // labels2 - "counter/": 40, // labels3 - "gauge/G~H&C~D": 50, // labels1 - "gauge/C~D&E~F": 20, // labels2 - "gauge/": 30, // labels3 + "counter.a/G~H&C~D": 60, // labels1 + "counter.a/C~D&E~F": 20, // labels2 + "counter.a/": 40, // labels3 + "counter.b/G~H&C~D": 60, // labels1 + "counter.b/C~D&E~F": 20, // labels2 + "counter.b/": 40, // labels3 + "gauge.a/G~H&C~D": 50, // labels1 + "gauge.a/C~D&E~F": 20, // labels2 + "gauge.a/": 30, // labels3 + "gauge.b/G~H&C~D": 50, // labels1 + "gauge.b/C~D&E~F": 20, // labels2 + "gauge.b/": 30, // labels3 }, records) // Verify that state was reset @@ -77,8 +93,13 @@ func TestUngroupedStateful(t *testing.T) { ctx := context.Background() b := ungrouped.New(test.NewAggregationSelector(), true) - cagg := test.CounterAgg(10) - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels1, cagg)) + counterA := test.NewCounterRecord(test.CounterADesc, test.Labels1, 10) + caggA := counterA.Aggregator() + _ = b.Process(ctx, counterA) + + counterB := test.NewCounterRecord(test.CounterBDesc, test.Labels1, 10) + caggB := counterB.Aggregator() + _ = b.Process(ctx, counterB) checkpointSet := b.CheckpointSet() b.FinishedCollection() @@ -87,7 +108,8 @@ func TestUngroupedStateful(t *testing.T) { checkpointSet.ForEach(records1.AddTo) require.EqualValues(t, map[string]int64{ - "counter/G~H&C~D": 10, // labels1 + "counter.a/G~H&C~D": 10, // labels1 + "counter.b/G~H&C~D": 10, // labels1 }, records1) // Test that state was NOT reset @@ -100,8 +122,10 @@ func TestUngroupedStateful(t *testing.T) { require.EqualValues(t, records1, records2) // Update and re-checkpoint the original record. - _ = cagg.Update(ctx, core.NewInt64Number(20), test.CounterDesc) - cagg.Checkpoint(ctx, test.CounterDesc) + _ = caggA.Update(ctx, core.NewInt64Number(20), test.CounterADesc) + _ = caggB.Update(ctx, core.NewInt64Number(20), test.CounterBDesc) + caggA.Checkpoint(ctx, test.CounterADesc) + caggB.Checkpoint(ctx, test.CounterBDesc) // As yet cagg has not been passed to Batcher.Process. Should // not see an update. @@ -114,7 +138,8 @@ func TestUngroupedStateful(t *testing.T) { require.EqualValues(t, records1, records3) // Now process the second update - _ = b.Process(ctx, export.NewRecord(test.CounterDesc, test.Labels1, cagg)) + _ = b.Process(ctx, export.NewRecord(test.CounterADesc, test.Labels1, caggA)) + _ = b.Process(ctx, export.NewRecord(test.CounterBDesc, test.Labels1, caggB)) checkpointSet = b.CheckpointSet() b.FinishedCollection() @@ -123,6 +148,7 @@ func TestUngroupedStateful(t *testing.T) { checkpointSet.ForEach(records4.AddTo) require.EqualValues(t, map[string]int64{ - "counter/G~H&C~D": 30, + "counter.a/G~H&C~D": 30, + "counter.b/G~H&C~D": 30, }, records4) }