1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-09 13:37:12 +02:00

Remove Context arguments from Aggregator.Checkpoint and Integrator.Process (#803)

* Typo

* Swap order of ddsketch.New for consistency w/ histogram.New

* Remove Integrator.Process ctx argument

* Remove Aggregator.Checkpoint ctx argument

* Revert bugfix
This commit is contained in:
Joshua MacDonald 2020-06-09 11:00:50 -07:00 committed by GitHub
parent a43367a7a4
commit 9401bd9cda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 104 additions and 207 deletions

View File

@ -71,7 +71,7 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega
return nil
}
func (*benchFixture) Process(context.Context, export.Record) error {
func (*benchFixture) Process(export.Record) error {
return nil
}

View File

@ -101,7 +101,7 @@ func TestStdoutTimestamp(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
lvagg.Checkpoint(ctx, &desc)
lvagg.Checkpoint(&desc)
checkpointSet.Add(&desc, lvagg)
@ -146,7 +146,7 @@ func TestStdoutCounterFormat(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
cagg := sum.New()
aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
cagg.Checkpoint(fix.ctx, &desc)
cagg.Checkpoint(&desc)
checkpointSet.Add(&desc, cagg, kv.String("A", "B"), kv.String("C", "D"))
@ -163,7 +163,7 @@ func TestStdoutLastValueFormat(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
lvagg.Checkpoint(fix.ctx, &desc)
lvagg.Checkpoint(&desc)
checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
@ -181,7 +181,7 @@ func TestStdoutMinMaxSumCount(t *testing.T) {
magg := minmaxsumcount.New(&desc)
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
magg.Checkpoint(fix.ctx, &desc)
magg.Checkpoint(&desc)
checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))
@ -204,7 +204,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(float64(i)+0.5), &desc)
}
magg.Checkpoint(fix.ctx, &desc)
magg.Checkpoint(&desc)
checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))
@ -252,7 +252,7 @@ func TestStdoutNoData(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)
magg := tc
magg.Checkpoint(fix.ctx, &desc)
magg.Checkpoint(&desc)
checkpointSet.Add(&desc, magg)
@ -270,7 +270,7 @@ func TestStdoutLastValueNotSet(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
lvagg.Checkpoint(fix.ctx, &desc)
lvagg.Checkpoint(&desc)
checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
@ -321,7 +321,7 @@ func TestStdoutResource(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
lvagg.Checkpoint(fix.ctx, &desc)
lvagg.Checkpoint(&desc)
checkpointSet.Add(&desc, lvagg, tc.attrs...)

View File

@ -105,7 +105,7 @@ func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.
ctx := context.Background()
// Updates and checkpoint the new aggregator
_ = newAgg.Update(ctx, createNumber(desc, v), desc)
newAgg.Checkpoint(ctx, desc)
newAgg.Checkpoint(desc)
// Try to add this aggregator to the CheckpointSet
agg, added := p.Add(desc, newAgg, labels...)

View File

@ -89,7 +89,7 @@ func TestMinMaxSumCountValue(t *testing.T) {
assert.EqualError(t, err, aggregator.ErrNoData.Error())
// Checkpoint to set non-zero values
mmsc.Checkpoint(context.Background(), &metric.Descriptor{})
mmsc.Checkpoint(&metric.Descriptor{})
min, max, sum, count, err := minMaxSumCountValues(mmsc)
if assert.NoError(t, err) {
assert.Equal(t, min, metric.NewInt64Number(1))
@ -146,7 +146,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) {
return
}
mmsc.Checkpoint(ctx, &metric.Descriptor{})
mmsc.Checkpoint(&metric.Descriptor{})
for _, test := range tests {
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
metric.WithDescription(test.description),
@ -165,7 +165,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
mmsc := minmaxsumcount.New(&desc)
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
mmsc.Checkpoint(context.Background(), &desc)
mmsc.Checkpoint(&desc)
expected := []*metricpb.SummaryDataPoint{
{
Count: 2,
@ -261,7 +261,7 @@ func TestSumInt64DataPoints(t *testing.T) {
labels := label.NewSet()
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
s.Checkpoint(context.Background(), &desc)
s.Checkpoint(&desc)
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
@ -275,7 +275,7 @@ func TestSumFloat64DataPoints(t *testing.T) {
labels := label.NewSet()
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
s.Checkpoint(context.Background(), &desc)
s.Checkpoint(&desc)
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDataPoints)

View File

@ -657,7 +657,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
default:
t.Fatalf("invalid number kind: %v", r.nKind)
}
agg.Checkpoint(ctx, &desc)
agg.Checkpoint(&desc)
equiv := r.resource.Equivalent()
resources[equiv] = r.resource

View File

@ -62,11 +62,12 @@ type Integrator interface {
// Process is called by the SDK once per internal record,
// passing the export Record (a Descriptor, the corresponding
// Labels, and the checkpointed Aggregator).
//
// The Context argument originates from the controller that
// orchestrates collection.
Process(ctx context.Context, record Record) error
// Labels, and the checkpointed Aggregator). This call has no
// Context argument because it is expected to perform only
// computation. An SDK is not expected to call exporters from
// with Process, use a controller for that (see
// ./controllers/{pull,push}.
Process(record Record) error
}
// AggregationSelector supports selecting the kind of Aggregator to
@ -119,9 +120,9 @@ type Aggregator interface {
// accessed using by converting to one a suitable interface
// types in the `aggregator` sub-package.
//
// The Context argument originates from the controller that
// orchestrates collection.
Checkpoint(context.Context, *metric.Descriptor)
// This call has no Context argument because it is expected to
// perform only computation.
Checkpoint(*metric.Descriptor)
// Merge combines the checkpointed state from the argument
// aggregator into this aggregator's checkpointed state.

View File

@ -85,7 +85,7 @@ func (c *Aggregator) Points() ([]metric.Number, error) {
// Checkpoint saves the current state and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
c.lock.Lock()
c.checkpoint, c.current = c.current, nil
c.lock.Unlock()

View File

@ -15,7 +15,6 @@
package array
import (
"context"
"fmt"
"math"
"os"
@ -66,8 +65,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}
ctx := context.Background()
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
all.Sort()
@ -116,8 +114,6 @@ type mergeTest struct {
}
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg1 := New()
@ -145,8 +141,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}
agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
test.CheckedMerge(t, agg1, agg2, descriptor)
@ -213,8 +209,6 @@ func TestArrayErrors(t *testing.T) {
require.Error(t, err)
require.Equal(t, err, aggregator.ErrNoData)
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
test.CheckedUpdate(t, agg, metric.Number(0), descriptor)
@ -222,7 +216,7 @@ func TestArrayErrors(t *testing.T) {
if profile.NumberKind == metric.Float64NumberKind {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor)
}
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
count, err := agg.Count()
require.Equal(t, int64(1), count, "NaN value was not counted")
@ -275,7 +269,6 @@ func TestArrayFloat64(t *testing.T) {
all := test.NewNumbers(metric.Float64NumberKind)
ctx := context.Background()
agg := New()
for _, f := range fpsf(1) {
@ -288,7 +281,7 @@ func TestArrayFloat64(t *testing.T) {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor)
}
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
all.Sort()

View File

@ -103,7 +103,7 @@ func (c *Aggregator) toNumber(f float64) metric.Number {
// Checkpoint saves the current state and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) {
func (c *Aggregator) Checkpoint(*metric.Descriptor) {
replace := sdk.NewDDSketch(c.cfg)
c.lock.Lock()

View File

@ -15,7 +15,6 @@
package ddsketch
import (
"context"
"fmt"
"testing"
@ -31,8 +30,6 @@ type updateTest struct {
}
func (ut *updateTest) run(t *testing.T, profile test.Profile) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor, NewDefaultConfig())
@ -47,7 +44,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
all.Sort()
@ -91,7 +88,6 @@ type mergeTest struct {
}
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg1 := New(descriptor, NewDefaultConfig())
@ -122,8 +118,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}
agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
test.CheckedMerge(t, agg1, agg2, descriptor)

View File

@ -107,7 +107,7 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
c.lock.Lock()
c.checkpoint, c.current = c.current, emptyState(c.boundaries)
c.lock.Unlock()

View File

@ -15,7 +15,6 @@
package histogram_test
import (
"context"
"math"
"math/rand"
"sort"
@ -81,7 +80,6 @@ func TestHistogramPositiveAndNegative(t *testing.T) {
// Validates count, sum and buckets for a given profile and policy
func testHistogram(t *testing.T, profile test.Profile, policy policy) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := histogram.New(descriptor, boundaries)
@ -94,7 +92,7 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) {
test.CheckedUpdate(t, agg, x, descriptor)
}
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
all.Sort()
@ -137,8 +135,6 @@ func TestHistogramInitial(t *testing.T) {
}
func TestHistogramMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
@ -158,8 +154,8 @@ func TestHistogramMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}
agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
test.CheckedMerge(t, agg1, agg2, descriptor)
@ -192,13 +188,11 @@ func TestHistogramMerge(t *testing.T) {
}
func TestHistogramNotSet(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := histogram.New(descriptor, boundaries)
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
asum, err := agg.Sum()
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")

View File

@ -80,7 +80,7 @@ func (g *Aggregator) LastValue() (metric.Number, time.Time, error) {
}
// Checkpoint atomically saves the current value.
func (g *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) {
func (g *Aggregator) Checkpoint(*metric.Descriptor) {
g.checkpoint = atomic.LoadPointer(&g.current)
}

View File

@ -15,7 +15,6 @@
package lastvalue
import (
"context"
"math/rand"
"os"
"testing"
@ -50,8 +49,6 @@ func TestMain(m *testing.M) {
}
func TestLastValueUpdate(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg := New()
@ -64,7 +61,7 @@ func TestLastValueUpdate(t *testing.T) {
test.CheckedUpdate(t, agg, x, record)
}
agg.Checkpoint(ctx, record)
agg.Checkpoint(record)
lv, _, err := agg.LastValue()
require.Equal(t, last, lv, "Same last value - non-monotonic")
@ -73,8 +70,6 @@ func TestLastValueUpdate(t *testing.T) {
}
func TestLastValueMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()
@ -88,8 +83,8 @@ func TestLastValueMerge(t *testing.T) {
test.CheckedUpdate(t, agg1, first1, descriptor)
test.CheckedUpdate(t, agg2, first2, descriptor)
agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
_, t1, err := agg1.LastValue()
require.Nil(t, err)
@ -110,7 +105,7 @@ func TestLastValueNotSet(t *testing.T) {
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind)
g := New()
g.Checkpoint(context.Background(), descriptor)
g.Checkpoint(descriptor)
value, timestamp, err := g.LastValue()
require.Equal(t, aggregator.ErrNoData, err)

View File

@ -102,7 +102,7 @@ func (c *Aggregator) Max() (metric.Number, error) {
// Checkpoint saves the current state and resets the current state to
// the empty set.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
c.lock.Lock()
c.checkpoint, c.current = c.current, c.emptyState()
c.lock.Unlock()

View File

@ -15,7 +15,6 @@
package minmaxsumcount
import (
"context"
"math"
"math/rand"
"testing"
@ -78,7 +77,6 @@ func TestMinMaxSumCountPositiveAndNegative(t *testing.T) {
// Validates min, max, sum and count for a given profile and policy
func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor)
@ -91,7 +89,7 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
test.CheckedUpdate(t, agg, x, descriptor)
}
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
all.Sort()
@ -124,8 +122,6 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
}
func TestMinMaxSumCountMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
@ -145,8 +141,8 @@ func TestMinMaxSumCountMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}
agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
test.CheckedMerge(t, agg1, agg2, descriptor)
@ -182,13 +178,11 @@ func TestMinMaxSumCountMerge(t *testing.T) {
}
func TestMaxSumCountNotSet(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor)
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
asum, err := agg.Sum()
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")

View File

@ -51,7 +51,7 @@ func (c *Aggregator) Sum() (metric.Number, error) {
// Checkpoint atomically saves the current value and resets the
// current sum to zero.
func (c *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) {
func (c *Aggregator) Checkpoint(*metric.Descriptor) {
c.checkpoint = c.current.SwapNumberAtomic(metric.Number(0))
}

View File

@ -15,7 +15,6 @@
package sum
import (
"context"
"os"
"testing"
"unsafe"
@ -49,8 +48,6 @@ func TestMain(m *testing.M) {
}
func TestCounterSum(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg := New()
@ -63,7 +60,7 @@ func TestCounterSum(t *testing.T) {
test.CheckedUpdate(t, agg, x, descriptor)
}
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
asum, err := agg.Sum()
require.Equal(t, sum, asum, "Same sum - monotonic")
@ -72,8 +69,6 @@ func TestCounterSum(t *testing.T) {
}
func TestValueRecorderSum(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg := New()
@ -90,7 +85,7 @@ func TestValueRecorderSum(t *testing.T) {
sum.AddNumber(profile.NumberKind, r2)
}
agg.Checkpoint(ctx, descriptor)
agg.Checkpoint(descriptor)
asum, err := agg.Sum()
require.Equal(t, sum, asum, "Same sum - monotonic")
@ -99,8 +94,6 @@ func TestValueRecorderSum(t *testing.T) {
}
func TestCounterMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()
@ -115,8 +108,8 @@ func TestCounterMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}
agg1.Checkpoint(ctx, descriptor)
agg2.Checkpoint(ctx, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
test.CheckedMerge(t, agg1, agg2, descriptor)

View File

@ -32,13 +32,10 @@ import (
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)
type processFunc func(context.Context, export.Record) error
type benchFixture struct {
meter metric.MeterMust
accumulator *sdk.Accumulator
B *testing.B
pcb processFunc
}
func newFixture(b *testing.B) *benchFixture {
@ -52,10 +49,6 @@ func newFixture(b *testing.B) *benchFixture {
return bf
}
func (f *benchFixture) setProcessCallback(cb processFunc) {
f.pcb = cb
}
func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
name := descriptor.Name()
switch {
@ -75,11 +68,8 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega
return nil
}
func (f *benchFixture) Process(ctx context.Context, rec export.Record) error {
if f.pcb == nil {
return nil
}
return f.pcb(ctx, rec)
func (f *benchFixture) Process(rec export.Record) error {
return nil
}
func (*benchFixture) CheckpointSet() export.CheckpointSet {
@ -201,28 +191,14 @@ func BenchmarkAcquireReleaseExistingHandle(b *testing.B) {
var benchmarkIteratorVar kv.KeyValue
func benchmarkIterator(b *testing.B, n int) {
fix := newFixture(b)
fix.setProcessCallback(func(ctx context.Context, rec export.Record) error {
var kv kv.KeyValue
li := rec.Labels().Iter()
fix.B.StartTimer()
for i := 0; i < fix.B.N; i++ {
iter := li
// test getting only the first element
if iter.Next() {
kv = iter.Label()
}
}
fix.B.StopTimer()
benchmarkIteratorVar = kv
return nil
})
cnt := fix.meter.NewInt64Counter("int64.counter")
ctx := context.Background()
cnt.Add(ctx, 1, makeLabels(n)...)
labels := label.NewSet(makeLabels(n)...)
b.ResetTimer()
fix.accumulator.Collect(ctx)
for i := 0; i < b.N; i++ {
iter := labels.Iter()
for iter.Next() {
benchmarkIteratorVar = iter.Label()
}
}
}
func BenchmarkIterator_0(b *testing.B) {
@ -560,11 +536,6 @@ func BenchmarkBatchRecord_8Labels_8Instruments(b *testing.B) {
func BenchmarkRepeatedDirectCalls(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
encoder := label.DefaultEncoder()
fix.pcb = func(_ context.Context, rec export.Record) error {
_ = rec.Labels().Encoded(encoder)
return nil
}
c := fix.meter.NewInt64Counter("int64.counter")
k := kv.String("bench", "true")
@ -576,39 +547,3 @@ func BenchmarkRepeatedDirectCalls(b *testing.B) {
fix.accumulator.Collect(ctx)
}
}
// LabelIterator
func BenchmarkLabelIterator(b *testing.B) {
const labelCount = 1024
ctx := context.Background()
fix := newFixture(b)
var rec export.Record
fix.pcb = func(_ context.Context, processRec export.Record) error {
rec = processRec
return nil
}
keyValues := makeLabels(labelCount)
counter := fix.meter.NewInt64Counter("test.counter")
counter.Add(ctx, 1, keyValues...)
fix.accumulator.Collect(ctx)
b.ResetTimer()
labels := rec.Labels()
iter := labels.Iter()
var val kv.KeyValue
for i := 0; i < b.N; i++ {
if !iter.Next() {
iter = labels.Iter()
iter.Next()
}
val = iter.Label()
}
if false {
fmt.Println(val)
}
}

View File

@ -119,7 +119,7 @@ func (ci *correctnessIntegrator) CheckpointSet() export.CheckpointSet {
func (*correctnessIntegrator) FinishedCollection() {
}
func (ci *correctnessIntegrator) Process(_ context.Context, record export.Record) error {
func (ci *correctnessIntegrator) Process(record export.Record) error {
ci.records = append(ci.records, record)
return nil
}

View File

@ -44,7 +44,7 @@ func TestStressInt64Histogram(t *testing.T) {
startTime := time.Now()
for time.Since(startTime) < time.Second {
h.Checkpoint(context.Background(), &desc)
h.Checkpoint(&desc)
b, _ := h.Histogram()
c, _ := h.Count()

View File

@ -15,7 +15,6 @@
package simple // import "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
import (
"context"
"errors"
"sync"
@ -65,7 +64,7 @@ func New(selector export.AggregationSelector, stateful bool) *Integrator {
}
}
func (b *Integrator) Process(_ context.Context, record export.Record) error {
func (b *Integrator) Process(record export.Record) error {
desc := record.Descriptor()
key := batchKey{
descriptor: desc,

View File

@ -30,34 +30,33 @@ import (
// These tests use the ../test label encoding.
func TestSimpleStateless(t *testing.T) {
ctx := context.Background()
b := simple.New(test.NewAggregationSelector(), false)
// Set initial lastValue values
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels2, 20))
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels3, 30))
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels2, 20))
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels3, 30))
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 10))
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels2, 20))
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels3, 30))
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 10))
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels2, 20))
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels3, 30))
// Another lastValue Set for Labels1
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 50))
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 50))
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 50))
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 50))
// Set initial counter values
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10))
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels2, 20))
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels3, 40))
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10))
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels2, 20))
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels3, 40))
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10))
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels2, 20))
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels3, 40))
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10))
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels2, 20))
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels3, 40))
// Another counter Add for Labels1
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels1, 50))
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50))
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels1, 50))
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50))
checkpointSet := b.CheckpointSet()
@ -97,11 +96,11 @@ func TestSimpleStateful(t *testing.T) {
counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)
caggA := counterA.Aggregator()
_ = b.Process(ctx, counterA)
_ = b.Process(counterA)
counterB := test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10)
caggB := counterB.Aggregator()
_ = b.Process(ctx, counterB)
_ = b.Process(counterB)
checkpointSet := b.CheckpointSet()
b.FinishedCollection()
@ -126,8 +125,8 @@ func TestSimpleStateful(t *testing.T) {
// Update and re-checkpoint the original record.
_ = caggA.Update(ctx, metric.NewInt64Number(20), &test.CounterADesc)
_ = caggB.Update(ctx, metric.NewInt64Number(20), &test.CounterBDesc)
caggA.Checkpoint(ctx, &test.CounterADesc)
caggB.Checkpoint(ctx, &test.CounterBDesc)
caggA.Checkpoint(&test.CounterADesc)
caggB.Checkpoint(&test.CounterBDesc)
// As yet cagg has not been passed to Integrator.Process. Should
// not see an update.
@ -140,8 +139,8 @@ func TestSimpleStateful(t *testing.T) {
b.FinishedCollection()
// Now process the second update
_ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA))
_ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB))
_ = b.Process(export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA))
_ = b.Process(export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB))
checkpointSet = b.CheckpointSet()

View File

@ -131,7 +131,7 @@ func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator {
ctx := context.Background()
gagg := lastvalue.New()
_ = gagg.Update(ctx, metric.NewInt64Number(v), desc)
gagg.Checkpoint(ctx, desc)
gagg.Checkpoint(desc)
return gagg
}
@ -150,7 +150,7 @@ func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator {
ctx := context.Background()
cagg := sum.New()
_ = cagg.Update(ctx, metric.NewInt64Number(v), desc)
cagg.Checkpoint(ctx, desc)
cagg.Checkpoint(desc)
return cagg
}

View File

@ -46,7 +46,7 @@ func TestStressInt64MinMaxSumCount(t *testing.T) {
startTime := time.Now()
for time.Since(startTime) < time.Second {
mmsc.Checkpoint(context.Background(), &desc)
mmsc.Checkpoint(&desc)
s, _ := mmsc.Sum()
c, _ := mmsc.Count()

View File

@ -50,7 +50,6 @@ type (
// `*asyncInstrument` instances
asyncLock sync.Mutex
asyncInstruments *internal.AsyncInstrumentState
asyncContext context.Context
// currentEpoch is the current epoch number. It is
// incremented in `Collect()`.
@ -354,13 +353,13 @@ func (m *Accumulator) Collect(ctx context.Context) int {
defer m.collectLock.Unlock()
checkpointed := m.observeAsyncInstruments(ctx)
checkpointed += m.collectSyncInstruments(ctx)
checkpointed += m.collectSyncInstruments()
m.currentEpoch++
return checkpointed
}
func (m *Accumulator) collectSyncInstruments(ctx context.Context) int {
func (m *Accumulator) collectSyncInstruments() int {
checkpointed := 0
m.current.Range(func(key interface{}, value interface{}) bool {
@ -374,7 +373,7 @@ func (m *Accumulator) collectSyncInstruments(ctx context.Context) int {
if mods != coll {
// Updates happened in this interval,
// checkpoint and continue.
checkpointed += m.checkpointRecord(ctx, inuse)
checkpointed += m.checkpointRecord(inuse)
inuse.collectedCount = mods
return true
}
@ -395,7 +394,7 @@ func (m *Accumulator) collectSyncInstruments(ctx context.Context) int {
// last we'll see of this record, checkpoint
mods = atomic.LoadInt64(&inuse.updateCount)
if mods != coll {
checkpointed += m.checkpointRecord(ctx, inuse)
checkpointed += m.checkpointRecord(inuse)
}
return true
})
@ -419,10 +418,9 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int {
defer m.asyncLock.Unlock()
asyncCollected := 0
m.asyncContext = ctx
// TODO: change this to `ctx` (in a separate PR, with tests)
m.asyncInstruments.Run(context.Background(), m)
m.asyncContext = nil
for _, inst := range m.asyncInstruments.Instruments() {
if a := m.fromAsync(inst); a != nil {
@ -433,8 +431,8 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int {
return asyncCollected
}
func (m *Accumulator) checkpointRecord(ctx context.Context, r *record) int {
return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels)
func (m *Accumulator) checkpointRecord(r *record) int {
return m.checkpoint(&r.inst.descriptor, r.recorder, r.labels)
}
func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
@ -446,7 +444,7 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
lrec := lrec
epochDiff := m.currentEpoch - lrec.observedEpoch
if epochDiff == 0 {
checkpointed += m.checkpoint(m.asyncContext, &a.descriptor, lrec.recorder, lrec.labels)
checkpointed += m.checkpoint(&a.descriptor, lrec.recorder, lrec.labels)
} else if epochDiff > 1 {
// This is second collection cycle with no
// observations for this labelset. Remove the
@ -460,14 +458,14 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
return checkpointed
}
func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int {
func (m *Accumulator) checkpoint(descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int {
if recorder == nil {
return 0
}
recorder.Checkpoint(ctx, descriptor)
recorder.Checkpoint(descriptor)
exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder)
err := m.integrator.Process(ctx, exportRecord)
err := m.integrator.Process(exportRecord)
if err != nil {
global.Handle(err)
}

View File

@ -263,7 +263,7 @@ func (*testFixture) CheckpointSet() export.CheckpointSet {
func (*testFixture) FinishedCollection() {
}
func (f *testFixture) Process(_ context.Context, record export.Record) error {
func (f *testFixture) Process(record export.Record) error {
labels := record.Labels().ToSlice()
key := testKey{
labels: canonicalizeLabels(labels),