1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-05-13 21:56:48 +02:00

Change Metric Processor to merge multiple observations (#1024)

* Add regexp filter in api/label, test

* Add regexp option to sdk.Config

* Return indistinct values only when keyRe != nil

* Filter in sdk

* Add an accumulator filter test

* SDK tests pass

* Precommit

* Undo set filters

* Backout related filter changes

* Add a new test

* Fix build

* Apply suggestions from code review

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

* Update comments

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Joshua MacDonald 2020-08-11 10:25:47 -07:00 committed by GitHub
parent 8f9f2d84cf
commit 3a05cd9325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 183 additions and 179 deletions

View File

@ -41,7 +41,7 @@ func TestPullNoCache(t *testing.T) {
ctx := context.Background() ctx := context.Background()
meter := puller.Provider().Meter("nocache") meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter") counter := metric.Must(meter).NewInt64Counter("counter.sum")
counter.Add(ctx, 10, kv.String("A", "B")) counter.Add(ctx, 10, kv.String("A", "B"))
@ -50,8 +50,8 @@ func TestPullNoCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 10, "counter.sum/A=B/": 10,
}, records.Map) }, records.Map())
counter.Add(ctx, 10, kv.String("A", "B")) counter.Add(ctx, 10, kv.String("A", "B"))
@ -60,8 +60,8 @@ func TestPullNoCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 20, "counter.sum/A=B/": 20,
}, records.Map) }, records.Map())
} }
func TestPullWithCache(t *testing.T) { func TestPullWithCache(t *testing.T) {
@ -75,7 +75,7 @@ func TestPullWithCache(t *testing.T) {
ctx := context.Background() ctx := context.Background()
meter := puller.Provider().Meter("nocache") meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter") counter := metric.Must(meter).NewInt64Counter("counter.sum")
counter.Add(ctx, 10, kv.String("A", "B")) counter.Add(ctx, 10, kv.String("A", "B"))
@ -84,8 +84,8 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 10, "counter.sum/A=B/": 10,
}, records.Map) }, records.Map())
counter.Add(ctx, 10, kv.String("A", "B")) counter.Add(ctx, 10, kv.String("A", "B"))
@ -95,8 +95,8 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 10, "counter.sum/A=B/": 10,
}, records.Map) }, records.Map())
mock.Add(time.Second) mock.Add(time.Second)
runtime.Gosched() runtime.Gosched()
@ -107,7 +107,7 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 20, "counter.sum/A=B/": 20,
}, records.Map) }, records.Map())
} }

View File

@ -364,7 +364,7 @@ func TestObserverCollection(t *testing.T) {
"float.updownsumobserver.sum/C=D/R=V": 1, "float.updownsumobserver.sum/C=D/R=V": 1,
"int.updownsumobserver.sum//R=V": -1, "int.updownsumobserver.sum//R=V": -1,
"int.updownsumobserver.sum/A=B/R=V": 1, "int.updownsumobserver.sum/A=B/R=V": 1,
}, out.Map) }, out.Map())
} }
func TestSumObserverInputRange(t *testing.T) { func TestSumObserverInputRange(t *testing.T) {
@ -467,7 +467,7 @@ func TestObserverBatch(t *testing.T) {
"float.valueobserver.lastvalue/C=D/R=V": -1, "float.valueobserver.lastvalue/C=D/R=V": -1,
"int.valueobserver.lastvalue//R=V": 1, "int.valueobserver.lastvalue//R=V": 1,
"int.valueobserver.lastvalue/A=B/R=V": 1, "int.valueobserver.lastvalue/A=B/R=V": 1,
}, out.Map) }, out.Map())
} }
func TestRecordBatch(t *testing.T) { func TestRecordBatch(t *testing.T) {
@ -502,7 +502,7 @@ func TestRecordBatch(t *testing.T) {
"float64.sum/A=B,C=D/R=V": 2, "float64.sum/A=B,C=D/R=V": 2,
"int64.exact/A=B,C=D/R=V": 3, "int64.exact/A=B,C=D/R=V": 3,
"float64.exact/A=B,C=D/R=V": 4, "float64.exact/A=B,C=D/R=V": 4,
}, out.Map) }, out.Map())
} }
// TestRecordPersistence ensures that a direct-called instrument that // TestRecordPersistence ensures that a direct-called instrument that
@ -582,5 +582,5 @@ func TestSyncInAsync(t *testing.T) {
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter.sum//R=V": 100, "counter.sum//R=V": 100,
"observer.lastvalue//R=V": 10, "observer.lastvalue//R=V": 10,
}, out.Map) }, out.Map())
} }

View File

@ -67,28 +67,29 @@ type (
// being maintained, taken from the process start time. // being maintained, taken from the process start time.
stateful bool stateful bool
// TODO: as seen in lengthy comments below, both the // currentOwned indicates that "current" was allocated
// `current` and `delta` fields have multiple uses // by the processor in order to merge results from
// depending on the specific configuration of // multiple Accumulators during a single collection
// instrument, exporter, and accumulator. It is // round, which may happen either because:
// possible to simplify this situation by declaring // (1) multiple Accumulators output the same Accumulation.
// explicit fields that are not used with a dual // (2) one Accumulator is configured with dimensionality reduction.
// purpose. Improve this situation? currentOwned bool
//
// 1. "delta" is used to combine deltas from multiple
// accumulators, and it is also used to store the
// output of subtraction when computing deltas of
// PrecomputedSum instruments.
//
// 2. "current" either refers to the Aggregator passed
// to Process() by a single accumulator (when either
// there is just one Accumulator, or the instrument is
// Asynchronous), or it refers to "delta", depending
// on configuration.
current export.Aggregator // refers to single-accumulator checkpoint or delta. // current refers to the output from a single Accumulator
delta export.Aggregator // owned if multi accumulator else nil. // (if !currentOwned) or it refers to an Aggregator
cumulative export.Aggregator // owned if stateful else nil. // owned by the processor used to accumulate multiple
// values in a single collection round.
current export.Aggregator
// delta, if non-nil, refers to an Aggregator owned by
// the processor used to compute deltas between
// precomputed sums.
delta export.Aggregator
// cumulative, if non-nil, refers to an Aggregator owned
// by the processor used to store the last cumulative
// value.
cumulative export.Aggregator
} }
state struct { state struct {
@ -172,10 +173,8 @@ func (b *Processor) Process(accum export.Accumulation) error {
// If we know we need to compute deltas, allocate two aggregators. // If we know we need to compute deltas, allocate two aggregators.
b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta) b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
} else { } else {
// In this case we are not certain to need a delta, only allocate a // In this case we are certain not to need a delta, only allocate
// cumulative aggregator. We _may_ need a delta accumulator if // a cumulative aggregator.
// multiple synchronous Accumulators produce an Accumulation (handled
// below), which requires merging them into a temporary Aggregator.
b.AggregatorFor(desc, &newValue.cumulative) b.AggregatorFor(desc, &newValue.cumulative)
} }
} }
@ -212,71 +211,36 @@ func (b *Processor) Process(accum export.Accumulation) error {
// Case (b) occurs when the variable `sameCollection` is true, // Case (b) occurs when the variable `sameCollection` is true,
// indicating that the stateKey for Accumulation has already // indicating that the stateKey for Accumulation has already
// been seen in the same collection. When this happens, it // been seen in the same collection. When this happens, it
// implies that multiple Accumulators are being used because // implies that multiple Accumulators are being used, or that
// the Accumulator outputs a maximum of one Accumulation per // a single Accumulator has been configured with a label key
// instrument and label set. // filter.
//
// The following logic distinguishes between asynchronous and
// synchronous instruments in order to ensure that the use of
// multiple Accumulators does not change instrument semantics.
// To maintain the instrument semantics, multiple synchronous
// Accumulations should be merged, whereas when multiple
// asynchronous Accumulations are processed, the last value
// should be kept.
if !sameCollection { if !sameCollection {
// This is the first Accumulation we've seen for this if !value.currentOwned {
// stateKey during this collection. Just keep a // This is the first Accumulation we've seen
// reference to the Accumulator's Aggregator. // for this stateKey during this collection.
value.current = agg // Just keep a reference to the Accumulator's
return nil // Aggregator. All the other cases copy
} // Aggregator state.
if desc.MetricKind().Asynchronous() { value.current = agg
// The last value across multiple accumulators is taken. return nil
// Just keep a reference to the Accumulator's Aggregator. }
value.current = agg return agg.SynchronizedMove(value.current, desc)
return nil
} }
// The above two cases are keeping a reference to the // If the current is not owned, take ownership of a copy
// Accumulator's Aggregator. The remaining cases address // before merging below.
// synchronous instruments, which always merge multiple if !value.currentOwned {
// Accumulations using `value.delta` for temporary storage. tmp := value.current
b.AggregatorSelector.AggregatorFor(desc, &value.current)
if value.delta == nil { value.currentOwned = true
// The temporary `value.delta` may have been allocated if err := tmp.SynchronizedMove(value.current, desc); err != nil {
// already, either in a prior pass through this block of
// code or in the `!ok` branch above. It would be
// allocated in the `!ok` branch if this is stateful
// PrecomputedSum instrument (in which case the exporter
// is requesting a delta so we allocate it up front),
// and it would be allocated in this block when multiple
// accumulators are used and the first condition is not
// met.
b.AggregatorSelector.AggregatorFor(desc, &value.delta)
}
if value.current != value.delta {
// If the current and delta Aggregators are not the same it
// implies that multiple Accumulators were used. The first
// Accumulation seen for a given stateKey will return in
// one of the cases above after assigning `value.current
// = agg` (i.e., after taking a reference to the
// Accumulator's Aggregator).
//
// The second time through this branch copies the
// Accumulator's Aggregator into `value.delta` and sets
// `value.current` appropriately to avoid this branch if
// a third Accumulator is used.
err := value.current.SynchronizedMove(value.delta, desc)
if err != nil {
return err return err
} }
value.current = value.delta
} }
// The two statements above ensures that `value.current` refers
// to `value.delta` and not to an Accumulator's Aggregator. Now // Combine this Accumulation with the prior Accumulation.
// combine this Accumulation with the prior Accumulation. return value.current.Merge(agg, desc)
return value.delta.Merge(agg, desc)
} }
// CheckpointSet returns the associated CheckpointSet. Use the // CheckpointSet returns the associated CheckpointSet. Use the

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"testing" "testing"
"time" "time"
@ -29,14 +30,8 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/export/metric/metrictest" "go.opentelemetry.io/otel/sdk/export/metric/metrictest"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/processor/processortest" processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
@ -101,29 +96,6 @@ func TestProcessor(t *testing.T) {
} }
} }
type testSelector struct {
kind aggregation.Kind
}
func (ts testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
for i := range aggPtrs {
switch ts.kind {
case aggregation.SumKind:
*aggPtrs[i] = &sum.New(1)[0]
case aggregation.MinMaxSumCountKind:
*aggPtrs[i] = &minmaxsumcount.New(1, desc)[0]
case aggregation.HistogramKind:
*aggPtrs[i] = &histogram.New(1, desc, nil)[0]
case aggregation.LastValueKind:
*aggPtrs[i] = &lastvalue.New(1)[0]
case aggregation.SketchKind:
*aggPtrs[i] = &ddsketch.New(1, desc, nil)[0]
case aggregation.ExactKind:
*aggPtrs[i] = &array.New(1)[0]
}
}
}
func asNumber(nkind metric.NumberKind, value int64) metric.Number { func asNumber(nkind metric.NumberKind, value int64) metric.Number {
if nkind == metric.Int64NumberKind { if nkind == metric.Int64NumberKind {
return metric.NewInt64Number(value) return metric.NewInt64Number(value)
@ -147,18 +119,22 @@ func testProcessor(
nkind metric.NumberKind, nkind metric.NumberKind,
akind aggregation.Kind, akind aggregation.Kind,
) { ) {
selector := testSelector{akind} // Note: this selector uses the instrument name to dictate
// aggregation kind.
selector := processorTest.AggregatorSelector()
res := resource.New(kv.String("R", "V")) res := resource.New(kv.String("R", "V"))
labs1 := []kv.KeyValue{kv.String("L1", "V")} labs1 := []kv.KeyValue{kv.String("L1", "V")}
labs2 := []kv.KeyValue{kv.String("L2", "V")} labs2 := []kv.KeyValue{kv.String("L2", "V")}
desc1 := metric.NewDescriptor("inst1", mkind, nkind)
desc2 := metric.NewDescriptor("inst2", mkind, nkind)
testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) { testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) {
processor := basic.New(selector, ekind, basic.WithMemory(hasMemory)) processor := basic.New(selector, ekind, basic.WithMemory(hasMemory))
instSuffix := fmt.Sprint(".", strings.ToLower(akind.String()))
desc1 := metric.NewDescriptor(fmt.Sprint("inst1", instSuffix), mkind, nkind)
desc2 := metric.NewDescriptor(fmt.Sprint("inst2", instSuffix), mkind, nkind)
for nc := 0; nc < nCheckpoint; nc++ { for nc := 0; nc < nCheckpoint; nc++ {
// The input is 10 per update, scaled by // The input is 10 per update, scaled by
@ -207,7 +183,7 @@ func testProcessor(
} }
// Test the final checkpoint state. // Test the final checkpoint state.
records1 := processortest.NewOutput(label.DefaultEncoder()) records1 := processorTest.NewOutput(label.DefaultEncoder())
err = checkpointSet.ForEach(ekind, records1.AddRecord) err = checkpointSet.ForEach(ekind, records1.AddRecord)
// Test for an allowed error: // Test for an allowed error:
@ -217,19 +193,24 @@ func testProcessor(
var multiplier int64 var multiplier int64
if mkind.Asynchronous() { if mkind.Asynchronous() {
// Because async instruments take the last value, // Asynchronous tests accumulate results multiply by the
// the number of accumulators doesn't matter. // number of Accumulators, unless LastValue aggregation.
// If a precomputed sum, we expect cumulative inputs.
if mkind.PrecomputedSum() { if mkind.PrecomputedSum() {
if ekind == export.DeltaExporter { if ekind == export.DeltaExporter && akind != aggregation.LastValueKind {
multiplier = 1 multiplier = int64(nAccum)
} else { } else if akind == aggregation.LastValueKind {
multiplier = cumulativeMultiplier multiplier = cumulativeMultiplier
} else {
multiplier = cumulativeMultiplier * int64(nAccum)
} }
} else { } else {
if ekind == export.CumulativeExporter && akind != aggregation.LastValueKind { if ekind == export.CumulativeExporter && akind != aggregation.LastValueKind {
multiplier = cumulativeMultiplier multiplier = cumulativeMultiplier * int64(nAccum)
} else { } else if akind == aggregation.LastValueKind {
multiplier = 1 multiplier = 1
} else {
multiplier = int64(nAccum)
} }
} }
} else { } else {
@ -249,11 +230,12 @@ func testProcessor(
exp := map[string]float64{} exp := map[string]float64{}
if hasMemory || !repetitionAfterEmptyInterval { if hasMemory || !repetitionAfterEmptyInterval {
exp = map[string]float64{ exp = map[string]float64{
"inst1/L1=V/R=V": float64(multiplier * 10), // labels1 fmt.Sprintf("inst1%s/L1=V/R=V", instSuffix): float64(multiplier * 10), // labels1
"inst2/L2=V/R=V": float64(multiplier * 10), // labels2 fmt.Sprintf("inst2%s/L2=V/R=V", instSuffix): float64(multiplier * 10), // labels2
} }
} }
require.EqualValues(t, exp, records1.Map, "with repetition=%v", repetitionAfterEmptyInterval)
require.EqualValues(t, exp, records1.Map(), "with repetition=%v", repetitionAfterEmptyInterval)
} }
} }
} }
@ -287,19 +269,19 @@ func (bogusExporter) Export(context.Context, export.CheckpointSet) error {
func TestBasicInconsistent(t *testing.T) { func TestBasicInconsistent(t *testing.T) {
// Test double-start // Test double-start
b := basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) b := basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
b.StartCollection() b.StartCollection()
b.StartCollection() b.StartCollection()
require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
// Test finish without start // Test finish without start
b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
// Test no finish // Test no finish
b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
b.StartCollection() b.StartCollection()
require.Equal( require.Equal(
@ -312,14 +294,14 @@ func TestBasicInconsistent(t *testing.T) {
) )
// Test no start // Test no start
b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind) desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{}) accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{})
require.Equal(t, basic.ErrInconsistentState, b.Process(accum)) require.Equal(t, basic.ErrInconsistentState, b.Process(accum))
// Test invalid kind: // Test invalid kind:
b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
b.StartCollection() b.StartCollection()
require.NoError(t, b.Process(accum)) require.NoError(t, b.Process(accum))
require.NoError(t, b.FinishCollection()) require.NoError(t, b.FinishCollection())
@ -334,7 +316,7 @@ func TestBasicInconsistent(t *testing.T) {
func TestBasicTimestamps(t *testing.T) { func TestBasicTimestamps(t *testing.T) {
beforeNew := time.Now() beforeNew := time.Now()
b := basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) b := basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
afterNew := time.Now() afterNew := time.Now()
desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind) desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
@ -383,8 +365,8 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
res := resource.New(kv.String("R", "V")) res := resource.New(kv.String("R", "V"))
ekind := export.CumulativeExporter ekind := export.CumulativeExporter
desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind) desc := metric.NewDescriptor("inst.sum", metric.CounterKind, metric.Int64NumberKind)
selector := testSelector{aggregation.SumKind} selector := processorTest.AggregatorSelector()
processor := basic.New(selector, ekind, basic.WithMemory(false)) processor := basic.New(selector, ekind, basic.WithMemory(false))
checkpointSet := processor.CheckpointSet() checkpointSet := processor.CheckpointSet()
@ -395,9 +377,9 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
require.NoError(t, processor.FinishCollection()) require.NoError(t, processor.FinishCollection())
// Verify zero elements // Verify zero elements
records := processortest.NewOutput(label.DefaultEncoder()) records := processorTest.NewOutput(label.DefaultEncoder())
require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
require.EqualValues(t, map[string]float64{}, records.Map) require.EqualValues(t, map[string]float64{}, records.Map())
// Add 10 // Add 10
processor.StartCollection() processor.StartCollection()
@ -405,11 +387,11 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
require.NoError(t, processor.FinishCollection()) require.NoError(t, processor.FinishCollection())
// Verify one element // Verify one element
records = processortest.NewOutput(label.DefaultEncoder()) records = processorTest.NewOutput(label.DefaultEncoder())
require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"inst/A=B/R=V": float64(i * 10), "inst.sum/A=B/R=V": float64(i * 10),
}, records.Map) }, records.Map())
} }
} }
@ -417,8 +399,8 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
res := resource.New(kv.String("R", "V")) res := resource.New(kv.String("R", "V"))
ekind := export.DeltaExporter ekind := export.DeltaExporter
desc := metric.NewDescriptor("inst", metric.SumObserverKind, metric.Int64NumberKind) desc := metric.NewDescriptor("inst.sum", metric.SumObserverKind, metric.Int64NumberKind)
selector := testSelector{aggregation.SumKind} selector := processorTest.AggregatorSelector()
processor := basic.New(selector, ekind, basic.WithMemory(false)) processor := basic.New(selector, ekind, basic.WithMemory(false))
checkpointSet := processor.CheckpointSet() checkpointSet := processor.CheckpointSet()
@ -429,9 +411,9 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
require.NoError(t, processor.FinishCollection()) require.NoError(t, processor.FinishCollection())
// Verify zero elements // Verify zero elements
records := processortest.NewOutput(label.DefaultEncoder()) records := processorTest.NewOutput(label.DefaultEncoder())
require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
require.EqualValues(t, map[string]float64{}, records.Map) require.EqualValues(t, map[string]float64{}, records.Map())
// Add 10 // Add 10
processor.StartCollection() processor.StartCollection()
@ -439,10 +421,48 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
require.NoError(t, processor.FinishCollection()) require.NoError(t, processor.FinishCollection())
// Verify one element // Verify one element
records = processortest.NewOutput(label.DefaultEncoder()) records = processorTest.NewOutput(label.DefaultEncoder())
require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"inst/A=B/R=V": 10, "inst.sum/A=B/R=V": 10,
}, records.Map) }, records.Map())
}
}
func TestMultiObserverSum(t *testing.T) {
for _, ekind := range []export.ExportKind{
export.PassThroughExporter,
export.CumulativeExporter,
export.DeltaExporter,
} {
res := resource.New(kv.String("R", "V"))
desc := metric.NewDescriptor("observe.sum", metric.SumObserverKind, metric.Int64NumberKind)
selector := processorTest.AggregatorSelector()
processor := basic.New(selector, ekind, basic.WithMemory(false))
checkpointSet := processor.CheckpointSet()
for i := 1; i < 3; i++ {
// Add i*10*3 times
processor.StartCollection()
_ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B")))
_ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B")))
_ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B")))
require.NoError(t, processor.FinishCollection())
// Multiplier is 1 for deltas, otherwise i.
multiplier := i
if ekind == export.DeltaExporter {
multiplier = 1
}
// Verify one element
records := processorTest.NewOutput(label.DefaultEncoder())
require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
require.EqualValues(t, map[string]float64{
"observe.sum/A=B/R=V": float64(3 * 10 * multiplier),
}, records.Map())
}
} }
} }

View File

@ -32,9 +32,14 @@ import (
) )
type ( type (
nameWithNumKind struct {
name string
numberKind metric.NumberKind
}
// Output collects distinct metric/label set outputs. // Output collects distinct metric/label set outputs.
Output struct { Output struct {
Map map[string]float64 m map[nameWithNumKind]export.Aggregator
labelEncoder label.Encoder labelEncoder label.Encoder
} }
@ -46,7 +51,7 @@ type (
func NewOutput(labelEncoder label.Encoder) Output { func NewOutput(labelEncoder label.Encoder) Output {
return Output{ return Output{
Map: make(map[string]float64), m: make(map[nameWithNumKind]export.Aggregator),
labelEncoder: labelEncoder, labelEncoder: labelEncoder,
} }
} }
@ -107,20 +112,35 @@ func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...
func (o Output) AddRecord(rec export.Record) error { func (o Output) AddRecord(rec export.Record) error {
encoded := rec.Labels().Encoded(o.labelEncoder) encoded := rec.Labels().Encoded(o.labelEncoder)
rencoded := rec.Resource().Encoded(o.labelEncoder) rencoded := rec.Resource().Encoded(o.labelEncoder)
key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded) key := nameWithNumKind{
var value float64 name: fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded),
numberKind: rec.Descriptor().NumberKind(),
if s, ok := rec.Aggregation().(aggregation.Sum); ok {
sum, _ := s.Sum()
value = sum.CoerceToFloat64(rec.Descriptor().NumberKind())
} else if l, ok := rec.Aggregation().(aggregation.LastValue); ok {
last, _, _ := l.LastValue()
value = last.CoerceToFloat64(rec.Descriptor().NumberKind())
} else {
panic(fmt.Sprintf("Unhandled aggregator type: %T", rec.Aggregation()))
} }
o.Map[key] = value
return nil if _, ok := o.m[key]; !ok {
var agg export.Aggregator
testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg)
o.m[key] = agg
}
return o.m[key].Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor())
}
func (o Output) Map() map[string]float64 {
r := make(map[string]float64)
for nnk, agg := range o.m {
value := 0.0
if s, ok := agg.(aggregation.Sum); ok {
sum, _ := s.Sum()
value = sum.CoerceToFloat64(nnk.numberKind)
} else if l, ok := agg.(aggregation.LastValue); ok {
last, _, _ := l.LastValue()
value = last.CoerceToFloat64(nnk.numberKind)
} else {
panic(fmt.Sprintf("Unhandled aggregator type: %T", agg))
}
r[nnk.name] = value
}
return r
} }
// AddAccumulation adds a string representation of the exported metric // AddAccumulation adds a string representation of the exported metric