From 3a05cd93255fd44c233ece4fc843a6e840ab56c7 Mon Sep 17 00:00:00 2001
From: Joshua MacDonald <jmacd@users.noreply.github.com>
Date: Tue, 11 Aug 2020 10:25:47 -0700
Subject: [PATCH] Change Metric Processor to merge multiple observations
 (#1024)

* Add regexp filter in api/label, test

* Add regexp option to sdk.Config

* Return indistinct values only when keyRe != nil

* Filter in sdk

* Add an accumulator filter test

* SDK tests pass

* Precommit

* Undo set filters

* Backout related filter changes

* Add a new test

* Fix build

* Apply suggestions from code review

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

* Update comments

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
---
 sdk/metric/controller/pull/pull_test.go    |  24 ++--
 sdk/metric/correct_test.go                 |   8 +-
 sdk/metric/processor/basic/basic.go        | 130 +++++++-----------
 sdk/metric/processor/basic/basic_test.go   | 150 ++++++++++++---------
 sdk/metric/processor/processortest/test.go |  50 ++++---
 5 files changed, 183 insertions(+), 179 deletions(-)

diff --git a/sdk/metric/controller/pull/pull_test.go b/sdk/metric/controller/pull/pull_test.go
index 8a129caf8..18ef8eeb4 100644
--- a/sdk/metric/controller/pull/pull_test.go
+++ b/sdk/metric/controller/pull/pull_test.go
@@ -41,7 +41,7 @@ func TestPullNoCache(t *testing.T) {
 
 	ctx := context.Background()
 	meter := puller.Provider().Meter("nocache")
-	counter := metric.Must(meter).NewInt64Counter("counter")
+	counter := metric.Must(meter).NewInt64Counter("counter.sum")
 
 	counter.Add(ctx, 10, kv.String("A", "B"))
 
@@ -50,8 +50,8 @@ func TestPullNoCache(t *testing.T) {
 	require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
 
 	require.EqualValues(t, map[string]float64{
-		"counter/A=B/": 10,
-	}, records.Map)
+		"counter.sum/A=B/": 10,
+	}, records.Map())
 
 	counter.Add(ctx, 10, kv.String("A", "B"))
 
@@ -60,8 +60,8 @@ func TestPullNoCache(t *testing.T) {
 	require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
 
 	require.EqualValues(t, map[string]float64{
-		"counter/A=B/": 20,
-	}, records.Map)
+		"counter.sum/A=B/": 20,
+	}, records.Map())
 }
 
 func TestPullWithCache(t *testing.T) {
@@ -75,7 +75,7 @@ func TestPullWithCache(t *testing.T) {
 
 	ctx := context.Background()
 	meter := puller.Provider().Meter("nocache")
-	counter := metric.Must(meter).NewInt64Counter("counter")
+	counter := metric.Must(meter).NewInt64Counter("counter.sum")
 
 	counter.Add(ctx, 10, kv.String("A", "B"))
 
@@ -84,8 +84,8 @@ func TestPullWithCache(t *testing.T) {
 	require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
 
 	require.EqualValues(t, map[string]float64{
-		"counter/A=B/": 10,
-	}, records.Map)
+		"counter.sum/A=B/": 10,
+	}, records.Map())
 
 	counter.Add(ctx, 10, kv.String("A", "B"))
 
@@ -95,8 +95,8 @@ func TestPullWithCache(t *testing.T) {
 	require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
 
 	require.EqualValues(t, map[string]float64{
-		"counter/A=B/": 10,
-	}, records.Map)
+		"counter.sum/A=B/": 10,
+	}, records.Map())
 
 	mock.Add(time.Second)
 	runtime.Gosched()
@@ -107,7 +107,7 @@ func TestPullWithCache(t *testing.T) {
 	require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
 
 	require.EqualValues(t, map[string]float64{
-		"counter/A=B/": 20,
-	}, records.Map)
+		"counter.sum/A=B/": 20,
+	}, records.Map())
 
 }
diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go
index 8f69a439a..6f4aae976 100644
--- a/sdk/metric/correct_test.go
+++ b/sdk/metric/correct_test.go
@@ -364,7 +364,7 @@ func TestObserverCollection(t *testing.T) {
 		"float.updownsumobserver.sum/C=D/R=V": 1,
 		"int.updownsumobserver.sum//R=V":      -1,
 		"int.updownsumobserver.sum/A=B/R=V":   1,
-	}, out.Map)
+	}, out.Map())
 }
 
 func TestSumObserverInputRange(t *testing.T) {
@@ -467,7 +467,7 @@ func TestObserverBatch(t *testing.T) {
 		"float.valueobserver.lastvalue/C=D/R=V": -1,
 		"int.valueobserver.lastvalue//R=V":      1,
 		"int.valueobserver.lastvalue/A=B/R=V":   1,
-	}, out.Map)
+	}, out.Map())
 }
 
 func TestRecordBatch(t *testing.T) {
@@ -502,7 +502,7 @@ func TestRecordBatch(t *testing.T) {
 		"float64.sum/A=B,C=D/R=V":   2,
 		"int64.exact/A=B,C=D/R=V":   3,
 		"float64.exact/A=B,C=D/R=V": 4,
-	}, out.Map)
+	}, out.Map())
 }
 
 // TestRecordPersistence ensures that a direct-called instrument that
@@ -582,5 +582,5 @@ func TestSyncInAsync(t *testing.T) {
 	require.EqualValues(t, map[string]float64{
 		"counter.sum//R=V":        100,
 		"observer.lastvalue//R=V": 10,
-	}, out.Map)
+	}, out.Map())
 }
diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go
index 09ebe9262..5ad1beb49 100644
--- a/sdk/metric/processor/basic/basic.go
+++ b/sdk/metric/processor/basic/basic.go
@@ -67,28 +67,29 @@ type (
 		// being maintained, taken from the process start time.
 		stateful bool
 
-		// TODO: as seen in lengthy comments below, both the
-		// `current` and `delta` fields have multiple uses
-		// depending on the specific configuration of
-		// instrument, exporter, and accumulator.  It is
-		// possible to simplify this situation by declaring
-		// explicit fields that are not used with a dual
-		// purpose.  Improve this situation?
-		//
-		// 1. "delta" is used to combine deltas from multiple
-		// accumulators, and it is also used to store the
-		// output of subtraction when computing deltas of
-		// PrecomputedSum instruments.
-		//
-		// 2. "current" either refers to the Aggregator passed
-		// to Process() by a single accumulator (when either
-		// there is just one Accumulator, or the instrument is
-		// Asynchronous), or it refers to "delta", depending
-		// on configuration.
+		// currentOwned indicates that "current" was allocated
+		// by the processor in order to merge results from
+		// multiple Accumulators during a single collection
+		// round, which may happen either because:
+		// (1) multiple Accumulators output the same Accumulation.
+		// (2) one Accumulator is configured with dimensionality reduction.
+		currentOwned bool
 
-		current    export.Aggregator // refers to single-accumulator checkpoint or delta.
-		delta      export.Aggregator // owned if multi accumulator else nil.
-		cumulative export.Aggregator // owned if stateful else nil.
+		// current refers to the output from a single Accumulator
+		// (if !currentOwned) or it refers to an Aggregator
+		// owned by the processor used to accumulate multiple
+		// values in a single collection round.
+		current export.Aggregator
+
+		// delta, if non-nil, refers to an Aggregator owned by
+		// the processor used to compute deltas between
+		// precomputed sums.
+		delta export.Aggregator
+
+		// cumulative, if non-nil, refers to an Aggregator owned
+		// by the processor used to store the last cumulative
+		// value.
+		cumulative export.Aggregator
 	}
 
 	state struct {
@@ -172,10 +173,8 @@ func (b *Processor) Process(accum export.Accumulation) error {
 				// If we know we need to compute deltas, allocate two aggregators.
 				b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
 			} else {
-				// In this case we are not certain to need a delta, only allocate a
-				// cumulative aggregator.  We _may_ need a delta accumulator if
-				// multiple synchronous Accumulators produce an Accumulation (handled
-				// below), which requires merging them into a temporary Aggregator.
+				// In this case we are certain not to need a delta, only allocate
+				// a cumulative aggregator.
 				b.AggregatorFor(desc, &newValue.cumulative)
 			}
 		}
@@ -212,71 +211,36 @@ func (b *Processor) Process(accum export.Accumulation) error {
 	// Case (b) occurs when the variable `sameCollection` is true,
 	// indicating that the stateKey for Accumulation has already
 	// been seen in the same collection.  When this happens, it
-	// implies that multiple Accumulators are being used because
-	// the Accumulator outputs a maximum of one Accumulation per
-	// instrument and label set.
-	//
-	// The following logic distinguishes between asynchronous and
-	// synchronous instruments in order to ensure that the use of
-	// multiple Accumulators does not change instrument semantics.
-	// To maintain the instrument semantics, multiple synchronous
-	// Accumulations should be merged, whereas when multiple
-	// asynchronous Accumulations are processed, the last value
-	// should be kept.
+	// implies that multiple Accumulators are being used, or that
+	// a single Accumulator has been configured with a label key
+	// filter.
 
 	if !sameCollection {
-		// This is the first Accumulation we've seen for this
-		// stateKey during this collection.  Just keep a
-		// reference to the Accumulator's Aggregator.
-		value.current = agg
-		return nil
-	}
-	if desc.MetricKind().Asynchronous() {
-		// The last value across multiple accumulators is taken.
-		// Just keep a reference to the Accumulator's Aggregator.
-		value.current = agg
-		return nil
+		if !value.currentOwned {
+			// This is the first Accumulation we've seen
+			// for this stateKey during this collection.
+			// Just keep a reference to the Accumulator's
+			// Aggregator.  All the other cases copy
+			// Aggregator state.
+			value.current = agg
+			return nil
+		}
+		return agg.SynchronizedMove(value.current, desc)
 	}
 
-	// The above two cases are keeping a reference to the
-	// Accumulator's Aggregator.  The remaining cases address
-	// synchronous instruments, which always merge multiple
-	// Accumulations using `value.delta` for temporary storage.
-
-	if value.delta == nil {
-		// The temporary `value.delta` may have been allocated
-		// already, either in a prior pass through this block of
-		// code or in the `!ok` branch above.  It would be
-		// allocated in the `!ok` branch if this is stateful
-		// PrecomputedSum instrument (in which case the exporter
-		// is requesting a delta so we allocate it up front),
-		// and it would be allocated in this block when multiple
-		// accumulators are used and the first condition is not
-		// met.
-		b.AggregatorSelector.AggregatorFor(desc, &value.delta)
-	}
-	if value.current != value.delta {
-		// If the current and delta Aggregators are not the same it
-		// implies that multiple Accumulators were used.  The first
-		// Accumulation seen for a given stateKey will return in
-		// one of the cases above after assigning `value.current
-		// = agg` (i.e., after taking a reference to the
-		// Accumulator's Aggregator).
-		//
-		// The second time through this branch copies the
-		// Accumulator's Aggregator into `value.delta` and sets
-		// `value.current` appropriately to avoid this branch if
-		// a third Accumulator is used.
-		err := value.current.SynchronizedMove(value.delta, desc)
-		if err != nil {
+	// If the current is not owned, take ownership of a copy
+	// before merging below.
+	if !value.currentOwned {
+		tmp := value.current
+		b.AggregatorSelector.AggregatorFor(desc, &value.current)
+		value.currentOwned = true
+		if err := tmp.SynchronizedMove(value.current, desc); err != nil {
 			return err
 		}
-		value.current = value.delta
 	}
-	// The two statements above ensures that `value.current` refers
-	// to `value.delta` and not to an Accumulator's Aggregator.  Now
-	// combine this Accumulation with the prior Accumulation.
-	return value.delta.Merge(agg, desc)
+
+	// Combine this Accumulation with the prior Accumulation.
+	return value.current.Merge(agg, desc)
 }
 
 // CheckpointSet returns the associated CheckpointSet.  Use the
diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go
index 1dcab1b7b..cee2896c4 100644
--- a/sdk/metric/processor/basic/basic_test.go
+++ b/sdk/metric/processor/basic/basic_test.go
@@ -18,6 +18,7 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	"strings"
 	"testing"
 	"time"
 
@@ -29,14 +30,8 @@ import (
 	export "go.opentelemetry.io/otel/sdk/export/metric"
 	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
 	"go.opentelemetry.io/otel/sdk/export/metric/metrictest"
-	"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
-	"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
-	"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
-	"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
-	"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
-	"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
 	"go.opentelemetry.io/otel/sdk/metric/processor/basic"
-	"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
+	processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
 	"go.opentelemetry.io/otel/sdk/resource"
 )
 
@@ -101,29 +96,6 @@ func TestProcessor(t *testing.T) {
 	}
 }
 
-type testSelector struct {
-	kind aggregation.Kind
-}
-
-func (ts testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
-	for i := range aggPtrs {
-		switch ts.kind {
-		case aggregation.SumKind:
-			*aggPtrs[i] = &sum.New(1)[0]
-		case aggregation.MinMaxSumCountKind:
-			*aggPtrs[i] = &minmaxsumcount.New(1, desc)[0]
-		case aggregation.HistogramKind:
-			*aggPtrs[i] = &histogram.New(1, desc, nil)[0]
-		case aggregation.LastValueKind:
-			*aggPtrs[i] = &lastvalue.New(1)[0]
-		case aggregation.SketchKind:
-			*aggPtrs[i] = &ddsketch.New(1, desc, nil)[0]
-		case aggregation.ExactKind:
-			*aggPtrs[i] = &array.New(1)[0]
-		}
-	}
-}
-
 func asNumber(nkind metric.NumberKind, value int64) metric.Number {
 	if nkind == metric.Int64NumberKind {
 		return metric.NewInt64Number(value)
@@ -147,18 +119,22 @@ func testProcessor(
 	nkind metric.NumberKind,
 	akind aggregation.Kind,
 ) {
-	selector := testSelector{akind}
+	// Note: this selector uses the instrument name to dictate
+	// aggregation kind.
+	selector := processorTest.AggregatorSelector()
 	res := resource.New(kv.String("R", "V"))
 
 	labs1 := []kv.KeyValue{kv.String("L1", "V")}
 	labs2 := []kv.KeyValue{kv.String("L2", "V")}
 
-	desc1 := metric.NewDescriptor("inst1", mkind, nkind)
-	desc2 := metric.NewDescriptor("inst2", mkind, nkind)
-
 	testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) {
 		processor := basic.New(selector, ekind, basic.WithMemory(hasMemory))
 
+		instSuffix := fmt.Sprint(".", strings.ToLower(akind.String()))
+
+		desc1 := metric.NewDescriptor(fmt.Sprint("inst1", instSuffix), mkind, nkind)
+		desc2 := metric.NewDescriptor(fmt.Sprint("inst2", instSuffix), mkind, nkind)
+
 		for nc := 0; nc < nCheckpoint; nc++ {
 
 			// The input is 10 per update, scaled by
@@ -207,7 +183,7 @@ func testProcessor(
 				}
 
 				// Test the final checkpoint state.
-				records1 := processortest.NewOutput(label.DefaultEncoder())
+				records1 := processorTest.NewOutput(label.DefaultEncoder())
 				err = checkpointSet.ForEach(ekind, records1.AddRecord)
 
 				// Test for an allowed error:
@@ -217,19 +193,24 @@ func testProcessor(
 				var multiplier int64
 
 				if mkind.Asynchronous() {
-					// Because async instruments take the last value,
-					// the number of accumulators doesn't matter.
+					// Asynchronous tests accumulate results multiply by the
+					// number of Accumulators, unless LastValue aggregation.
+					// If a precomputed sum, we expect cumulative inputs.
 					if mkind.PrecomputedSum() {
-						if ekind == export.DeltaExporter {
-							multiplier = 1
-						} else {
+						if ekind == export.DeltaExporter && akind != aggregation.LastValueKind {
+							multiplier = int64(nAccum)
+						} else if akind == aggregation.LastValueKind {
 							multiplier = cumulativeMultiplier
+						} else {
+							multiplier = cumulativeMultiplier * int64(nAccum)
 						}
 					} else {
 						if ekind == export.CumulativeExporter && akind != aggregation.LastValueKind {
-							multiplier = cumulativeMultiplier
-						} else {
+							multiplier = cumulativeMultiplier * int64(nAccum)
+						} else if akind == aggregation.LastValueKind {
 							multiplier = 1
+						} else {
+							multiplier = int64(nAccum)
 						}
 					}
 				} else {
@@ -249,11 +230,12 @@ func testProcessor(
 				exp := map[string]float64{}
 				if hasMemory || !repetitionAfterEmptyInterval {
 					exp = map[string]float64{
-						"inst1/L1=V/R=V": float64(multiplier * 10), // labels1
-						"inst2/L2=V/R=V": float64(multiplier * 10), // labels2
+						fmt.Sprintf("inst1%s/L1=V/R=V", instSuffix): float64(multiplier * 10), // labels1
+						fmt.Sprintf("inst2%s/L2=V/R=V", instSuffix): float64(multiplier * 10), // labels2
 					}
 				}
-				require.EqualValues(t, exp, records1.Map, "with repetition=%v", repetitionAfterEmptyInterval)
+
+				require.EqualValues(t, exp, records1.Map(), "with repetition=%v", repetitionAfterEmptyInterval)
 			}
 		}
 	}
@@ -287,19 +269,19 @@ func (bogusExporter) Export(context.Context, export.CheckpointSet) error {
 
 func TestBasicInconsistent(t *testing.T) {
 	// Test double-start
-	b := basic.New(processortest.AggregatorSelector(), export.PassThroughExporter)
+	b := basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
 
 	b.StartCollection()
 	b.StartCollection()
 	require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
 
 	// Test finish without start
-	b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter)
+	b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
 
 	require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
 
 	// Test no finish
-	b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter)
+	b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
 
 	b.StartCollection()
 	require.Equal(
@@ -312,14 +294,14 @@ func TestBasicInconsistent(t *testing.T) {
 	)
 
 	// Test no start
-	b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter)
+	b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
 
 	desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
 	accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{})
 	require.Equal(t, basic.ErrInconsistentState, b.Process(accum))
 
 	// Test invalid kind:
-	b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter)
+	b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
 	b.StartCollection()
 	require.NoError(t, b.Process(accum))
 	require.NoError(t, b.FinishCollection())
@@ -334,7 +316,7 @@ func TestBasicInconsistent(t *testing.T) {
 
 func TestBasicTimestamps(t *testing.T) {
 	beforeNew := time.Now()
-	b := basic.New(processortest.AggregatorSelector(), export.PassThroughExporter)
+	b := basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter)
 	afterNew := time.Now()
 
 	desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
@@ -383,8 +365,8 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
 	res := resource.New(kv.String("R", "V"))
 	ekind := export.CumulativeExporter
 
-	desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
-	selector := testSelector{aggregation.SumKind}
+	desc := metric.NewDescriptor("inst.sum", metric.CounterKind, metric.Int64NumberKind)
+	selector := processorTest.AggregatorSelector()
 
 	processor := basic.New(selector, ekind, basic.WithMemory(false))
 	checkpointSet := processor.CheckpointSet()
@@ -395,9 +377,9 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
 		require.NoError(t, processor.FinishCollection())
 
 		// Verify zero elements
-		records := processortest.NewOutput(label.DefaultEncoder())
+		records := processorTest.NewOutput(label.DefaultEncoder())
 		require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
-		require.EqualValues(t, map[string]float64{}, records.Map)
+		require.EqualValues(t, map[string]float64{}, records.Map())
 
 		// Add 10
 		processor.StartCollection()
@@ -405,11 +387,11 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
 		require.NoError(t, processor.FinishCollection())
 
 		// Verify one element
-		records = processortest.NewOutput(label.DefaultEncoder())
+		records = processorTest.NewOutput(label.DefaultEncoder())
 		require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
 		require.EqualValues(t, map[string]float64{
-			"inst/A=B/R=V": float64(i * 10),
-		}, records.Map)
+			"inst.sum/A=B/R=V": float64(i * 10),
+		}, records.Map())
 	}
 }
 
@@ -417,8 +399,8 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
 	res := resource.New(kv.String("R", "V"))
 	ekind := export.DeltaExporter
 
-	desc := metric.NewDescriptor("inst", metric.SumObserverKind, metric.Int64NumberKind)
-	selector := testSelector{aggregation.SumKind}
+	desc := metric.NewDescriptor("inst.sum", metric.SumObserverKind, metric.Int64NumberKind)
+	selector := processorTest.AggregatorSelector()
 
 	processor := basic.New(selector, ekind, basic.WithMemory(false))
 	checkpointSet := processor.CheckpointSet()
@@ -429,9 +411,9 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
 		require.NoError(t, processor.FinishCollection())
 
 		// Verify zero elements
-		records := processortest.NewOutput(label.DefaultEncoder())
+		records := processorTest.NewOutput(label.DefaultEncoder())
 		require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
-		require.EqualValues(t, map[string]float64{}, records.Map)
+		require.EqualValues(t, map[string]float64{}, records.Map())
 
 		// Add 10
 		processor.StartCollection()
@@ -439,10 +421,48 @@ func TestStatefulNoMemoryDelta(t *testing.T) {
 		require.NoError(t, processor.FinishCollection())
 
 		// Verify one element
-		records = processortest.NewOutput(label.DefaultEncoder())
+		records = processorTest.NewOutput(label.DefaultEncoder())
 		require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
 		require.EqualValues(t, map[string]float64{
-			"inst/A=B/R=V": 10,
-		}, records.Map)
+			"inst.sum/A=B/R=V": 10,
+		}, records.Map())
+	}
+}
+
+func TestMultiObserverSum(t *testing.T) {
+	for _, ekind := range []export.ExportKind{
+		export.PassThroughExporter,
+		export.CumulativeExporter,
+		export.DeltaExporter,
+	} {
+
+		res := resource.New(kv.String("R", "V"))
+		desc := metric.NewDescriptor("observe.sum", metric.SumObserverKind, metric.Int64NumberKind)
+		selector := processorTest.AggregatorSelector()
+
+		processor := basic.New(selector, ekind, basic.WithMemory(false))
+		checkpointSet := processor.CheckpointSet()
+
+		for i := 1; i < 3; i++ {
+			// Add i*10*3 times
+			processor.StartCollection()
+			_ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B")))
+			_ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B")))
+			_ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B")))
+			require.NoError(t, processor.FinishCollection())
+
+			// Multiplier is 1 for deltas, otherwise i.
+			multiplier := i
+			if ekind == export.DeltaExporter {
+				multiplier = 1
+			}
+
+			// Verify one element
+			records := processorTest.NewOutput(label.DefaultEncoder())
+			require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord))
+			require.EqualValues(t, map[string]float64{
+				"observe.sum/A=B/R=V": float64(3 * 10 * multiplier),
+			}, records.Map())
+		}
 	}
 }
diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go
index 422db8cbf..ab5f7499f 100644
--- a/sdk/metric/processor/processortest/test.go
+++ b/sdk/metric/processor/processortest/test.go
@@ -32,9 +32,14 @@ import (
 )
 
 type (
+	nameWithNumKind struct {
+		name       string
+		numberKind metric.NumberKind
+	}
+
 	// Output collects distinct metric/label set outputs.
 	Output struct {
-		Map          map[string]float64
+		m            map[nameWithNumKind]export.Aggregator
 		labelEncoder label.Encoder
 	}
 
@@ -46,7 +51,7 @@ type (
 
 func NewOutput(labelEncoder label.Encoder) Output {
 	return Output{
-		Map:          make(map[string]float64),
+		m:            make(map[nameWithNumKind]export.Aggregator),
 		labelEncoder: labelEncoder,
 	}
 }
@@ -107,20 +112,35 @@ func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...
 func (o Output) AddRecord(rec export.Record) error {
 	encoded := rec.Labels().Encoded(o.labelEncoder)
 	rencoded := rec.Resource().Encoded(o.labelEncoder)
-	key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded)
-	var value float64
-
-	if s, ok := rec.Aggregation().(aggregation.Sum); ok {
-		sum, _ := s.Sum()
-		value = sum.CoerceToFloat64(rec.Descriptor().NumberKind())
-	} else if l, ok := rec.Aggregation().(aggregation.LastValue); ok {
-		last, _, _ := l.LastValue()
-		value = last.CoerceToFloat64(rec.Descriptor().NumberKind())
-	} else {
-		panic(fmt.Sprintf("Unhandled aggregator type: %T", rec.Aggregation()))
+	key := nameWithNumKind{
+		name:       fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded),
+		numberKind: rec.Descriptor().NumberKind(),
 	}
-	o.Map[key] = value
-	return nil
+
+	if _, ok := o.m[key]; !ok {
+		var agg export.Aggregator
+		testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg)
+		o.m[key] = agg
+	}
+	return o.m[key].Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor())
+}
+
+func (o Output) Map() map[string]float64 {
+	r := make(map[string]float64)
+	for nnk, agg := range o.m {
+		value := 0.0
+		if s, ok := agg.(aggregation.Sum); ok {
+			sum, _ := s.Sum()
+			value = sum.CoerceToFloat64(nnk.numberKind)
+		} else if l, ok := agg.(aggregation.LastValue); ok {
+			last, _, _ := l.LastValue()
+			value = last.CoerceToFloat64(nnk.numberKind)
+		} else {
+			panic(fmt.Sprintf("Unhandled aggregator type: %T", agg))
+		}
+		r[nnk.name] = value
+	}
+	return r
 }
 
 // AddAccumulation adds a string representation of the exported metric