You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-07-15 01:04:25 +02:00
Rename SynchronizedCopy to SynchronizedMove and update comment (#858)
This commit is contained in:
@ -104,7 +104,7 @@ func TestStdoutTimestamp(t *testing.T) {
|
|||||||
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
|
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
|
||||||
|
|
||||||
aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
|
aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
|
||||||
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
checkpointSet.Add(&desc, ckpt)
|
checkpointSet.Add(&desc, ckpt)
|
||||||
|
|
||||||
@ -151,7 +151,7 @@ func TestStdoutCounterFormat(t *testing.T) {
|
|||||||
cagg, ckpt := test.Unslice2(sum.New(2))
|
cagg, ckpt := test.Unslice2(sum.New(2))
|
||||||
|
|
||||||
aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
|
aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
|
||||||
require.NoError(t, cagg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, cagg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
|
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
|
||||||
|
|
||||||
@ -169,7 +169,7 @@ func TestStdoutLastValueFormat(t *testing.T) {
|
|||||||
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
|
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
|
||||||
|
|
||||||
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
|
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
|
||||||
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
|
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
|
||||||
|
|
||||||
@ -189,7 +189,7 @@ func TestStdoutMinMaxSumCount(t *testing.T) {
|
|||||||
|
|
||||||
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
|
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
|
||||||
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
|
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
|
||||||
require.NoError(t, magg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, magg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
|
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
|
||||||
|
|
||||||
@ -212,7 +212,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
|
|||||||
aggtest.CheckedUpdate(fix.t, aagg, metric.NewFloat64Number(float64(i)+0.5), &desc)
|
aggtest.CheckedUpdate(fix.t, aagg, metric.NewFloat64Number(float64(i)+0.5), &desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, aagg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, aagg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
|
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
|
||||||
|
|
||||||
@ -256,7 +256,7 @@ func TestStdoutNoData(t *testing.T) {
|
|||||||
|
|
||||||
checkpointSet := test.NewCheckpointSet(testResource)
|
checkpointSet := test.NewCheckpointSet(testResource)
|
||||||
|
|
||||||
require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, agg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
checkpointSet.Add(&desc, ckpt)
|
checkpointSet.Add(&desc, ckpt)
|
||||||
|
|
||||||
@ -278,7 +278,7 @@ func TestStdoutLastValueNotSet(t *testing.T) {
|
|||||||
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
|
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
|
||||||
|
|
||||||
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
|
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
|
||||||
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
|
checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
|
||||||
|
|
||||||
@ -330,7 +330,7 @@ func TestStdoutResource(t *testing.T) {
|
|||||||
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
|
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
|
||||||
|
|
||||||
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
|
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
|
||||||
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, lvagg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
checkpointSet.Add(&desc, ckpt, tc.attrs...)
|
checkpointSet.Add(&desc, ckpt, tc.attrs...)
|
||||||
|
|
||||||
|
@ -52,8 +52,8 @@ func (NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchronizedCopy implements export.Aggregator.
|
// SynchronizedMove implements export.Aggregator.
|
||||||
func (NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error {
|
func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ func TestMinMaxSumCountValue(t *testing.T) {
|
|||||||
assert.EqualError(t, err, aggregation.ErrNoData.Error())
|
assert.EqualError(t, err, aggregation.ErrNoData.Error())
|
||||||
|
|
||||||
// Checkpoint to set non-zero values
|
// Checkpoint to set non-zero values
|
||||||
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &metric.Descriptor{}))
|
require.NoError(t, mmsc.SynchronizedMove(ckpt, &metric.Descriptor{}))
|
||||||
min, max, sum, count, err := minMaxSumCountValues(ckpt.(aggregation.MinMaxSumCount))
|
min, max, sum, count, err := minMaxSumCountValues(ckpt.(aggregation.MinMaxSumCount))
|
||||||
if assert.NoError(t, err) {
|
if assert.NoError(t, err) {
|
||||||
assert.Equal(t, min, metric.NewInt64Number(1))
|
assert.Equal(t, min, metric.NewInt64Number(1))
|
||||||
@ -158,7 +158,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
|
|||||||
if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) {
|
if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &metric.Descriptor{}))
|
require.NoError(t, mmsc.SynchronizedMove(ckpt, &metric.Descriptor{}))
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
|
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
|
||||||
metric.WithDescription(test.description),
|
metric.WithDescription(test.description),
|
||||||
@ -179,7 +179,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
|
|||||||
|
|
||||||
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
|
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
|
||||||
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
|
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
|
||||||
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, mmsc.SynchronizedMove(ckpt, &desc))
|
||||||
expected := []*metricpb.SummaryDataPoint{
|
expected := []*metricpb.SummaryDataPoint{
|
||||||
{
|
{
|
||||||
Count: 2,
|
Count: 2,
|
||||||
@ -280,7 +280,7 @@ func TestSumInt64DataPoints(t *testing.T) {
|
|||||||
labels := label.NewSet()
|
labels := label.NewSet()
|
||||||
s, ckpt := test.Unslice2(sumAgg.New(2))
|
s, ckpt := test.Unslice2(sumAgg.New(2))
|
||||||
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
|
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
|
||||||
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
|
||||||
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
|
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||||
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
|
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
|
||||||
assert.Equal(t, []*metricpb.Int64DataPoint{{
|
assert.Equal(t, []*metricpb.Int64DataPoint{{
|
||||||
@ -299,7 +299,7 @@ func TestSumFloat64DataPoints(t *testing.T) {
|
|||||||
labels := label.NewSet()
|
labels := label.NewSet()
|
||||||
s, ckpt := test.Unslice2(sumAgg.New(2))
|
s, ckpt := test.Unslice2(sumAgg.New(2))
|
||||||
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
|
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
|
||||||
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
|
||||||
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
|
record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||||
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
|
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
|
||||||
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
|
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
|
||||||
|
@ -741,7 +741,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
|
|||||||
default:
|
default:
|
||||||
t.Fatalf("invalid number kind: %v", r.nKind)
|
t.Fatalf("invalid number kind: %v", r.nKind)
|
||||||
}
|
}
|
||||||
require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, agg.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
equiv := r.resource.Equivalent()
|
equiv := r.resource.Equivalent()
|
||||||
resources[equiv] = r.resource
|
resources[equiv] = r.resource
|
||||||
|
@ -127,11 +127,12 @@ type Aggregator interface {
|
|||||||
// inspected for a `correlation.Map` or `trace.SpanContext`.
|
// inspected for a `correlation.Map` or `trace.SpanContext`.
|
||||||
Update(context.Context, metric.Number, *metric.Descriptor) error
|
Update(context.Context, metric.Number, *metric.Descriptor) error
|
||||||
|
|
||||||
// SynchronizedCopy is called during collection to finish one
|
// SynchronizedMove is called during collection to finish one
|
||||||
// period of aggregation by atomically saving the
|
// period of aggregation by atomically saving the
|
||||||
// currently-updating state into the argument Aggregator.
|
// currently-updating state into the argument Aggregator AND
|
||||||
|
// resetting the current value to the zero state.
|
||||||
//
|
//
|
||||||
// SynchronizedCopy() is called concurrently with Update(). These
|
// SynchronizedMove() is called concurrently with Update(). These
|
||||||
// two methods must be synchronized with respect to each
|
// two methods must be synchronized with respect to each
|
||||||
// other, for correctness.
|
// other, for correctness.
|
||||||
//
|
//
|
||||||
@ -145,11 +146,11 @@ type Aggregator interface {
|
|||||||
//
|
//
|
||||||
// This call has no Context argument because it is expected to
|
// This call has no Context argument because it is expected to
|
||||||
// perform only computation.
|
// perform only computation.
|
||||||
SynchronizedCopy(destination Aggregator, descriptor *metric.Descriptor) error
|
SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error
|
||||||
|
|
||||||
// Merge combines the checkpointed state from the argument
|
// Merge combines the checkpointed state from the argument
|
||||||
// Aggregator into this Aggregator. Merge is not synchronized
|
// Aggregator into this Aggregator. Merge is not synchronized
|
||||||
// with respect to Update or SynchronizedCopy.
|
// with respect to Update or SynchronizedMove.
|
||||||
//
|
//
|
||||||
// The owner of an Aggregator being merged is responsible for
|
// The owner of an Aggregator being merged is responsible for
|
||||||
// synchronization of both Aggregator states.
|
// synchronization of both Aggregator states.
|
||||||
|
@ -46,7 +46,7 @@ var _ aggregation.Points = &Aggregator{}
|
|||||||
|
|
||||||
// New returns a new array aggregator, which aggregates recorded
|
// New returns a new array aggregator, which aggregates recorded
|
||||||
// measurements by storing them in an array. This type uses a mutex
|
// measurements by storing them in an array. This type uses a mutex
|
||||||
// for Update() and SynchronizedCopy() concurrency.
|
// for Update() and SynchronizedMove() concurrency.
|
||||||
func New(cnt int) []Aggregator {
|
func New(cnt int) []Aggregator {
|
||||||
return make([]Aggregator, cnt)
|
return make([]Aggregator, cnt)
|
||||||
}
|
}
|
||||||
@ -92,9 +92,9 @@ func (c *Aggregator) Points() ([]metric.Number, error) {
|
|||||||
return c.points, nil
|
return c.points, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchronizedCopy saves the current state to oa and resets the current state to
|
// SynchronizedMove saves the current state to oa and resets the current state to
|
||||||
// the empty set, taking a lock to prevent concurrent Update() calls.
|
// the empty set, taking a lock to prevent concurrent Update() calls.
|
||||||
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
|
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
|
||||||
o, _ := oa.(*Aggregator)
|
o, _ := oa.(*Aggregator)
|
||||||
if o == nil {
|
if o == nil {
|
||||||
return aggregator.NewInconsistentAggregatorError(c, oa)
|
return aggregator.NewInconsistentAggregatorError(c, oa)
|
||||||
@ -114,7 +114,7 @@ func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descrip
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update adds the recorded measurement to the current data set.
|
// Update adds the recorded measurement to the current data set.
|
||||||
// Update takes a lock to prevent concurrent Update() and SynchronizedCopy()
|
// Update takes a lock to prevent concurrent Update() and SynchronizedMove()
|
||||||
// calls.
|
// calls.
|
||||||
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
|
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
|
||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
|
@ -77,7 +77,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
|||||||
test.CheckedUpdate(t, agg, y, descriptor)
|
test.CheckedUpdate(t, agg, y, descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := agg.SynchronizedCopy(ckpt, descriptor)
|
err := agg.SynchronizedMove(ckpt, descriptor)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
checkZero(t, agg, descriptor)
|
checkZero(t, agg, descriptor)
|
||||||
@ -154,8 +154,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
|
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
|
||||||
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
|
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))
|
||||||
|
|
||||||
checkZero(t, agg1, descriptor)
|
checkZero(t, agg1, descriptor)
|
||||||
checkZero(t, agg2, descriptor)
|
checkZero(t, agg2, descriptor)
|
||||||
@ -232,7 +232,7 @@ func TestArrayErrors(t *testing.T) {
|
|||||||
if profile.NumberKind == metric.Float64NumberKind {
|
if profile.NumberKind == metric.Float64NumberKind {
|
||||||
test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor)
|
test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor)
|
||||||
}
|
}
|
||||||
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
|
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||||
|
|
||||||
count, err := ckpt.Count()
|
count, err := ckpt.Count()
|
||||||
require.Equal(t, int64(1), count, "NaN value was not counted")
|
require.Equal(t, int64(1), count, "NaN value was not counted")
|
||||||
@ -297,7 +297,7 @@ func TestArrayFloat64(t *testing.T) {
|
|||||||
test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor)
|
test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
|
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||||
|
|
||||||
all.Sort()
|
all.Sort()
|
||||||
|
|
||||||
|
@ -112,9 +112,9 @@ func (c *Aggregator) toNumber(f float64) metric.Number {
|
|||||||
return metric.NewInt64Number(int64(f))
|
return metric.NewInt64Number(int64(f))
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchronizedCopy saves the current state into oa and resets the current state to
|
// SynchronizedMove saves the current state into oa and resets the current state to
|
||||||
// a new sketch, taking a lock to prevent concurrent Update() calls.
|
// a new sketch, taking a lock to prevent concurrent Update() calls.
|
||||||
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
|
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
|
||||||
o, _ := oa.(*Aggregator)
|
o, _ := oa.(*Aggregator)
|
||||||
if o == nil {
|
if o == nil {
|
||||||
return aggregator.NewInconsistentAggregatorError(c, oa)
|
return aggregator.NewInconsistentAggregatorError(c, oa)
|
||||||
@ -129,7 +129,7 @@ func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update adds the recorded measurement to the current data set.
|
// Update adds the recorded measurement to the current data set.
|
||||||
// Update takes a lock to prevent concurrent Update() and SynchronizedCopy()
|
// Update takes a lock to prevent concurrent Update() and SynchronizedMove()
|
||||||
// calls.
|
// calls.
|
||||||
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
|
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
|
||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
|
@ -80,7 +80,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
|||||||
test.CheckedUpdate(t, agg, y, descriptor)
|
test.CheckedUpdate(t, agg, y, descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := agg.SynchronizedCopy(ckpt, descriptor)
|
err := agg.SynchronizedMove(ckpt, descriptor)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
checkZero(t, agg, descriptor)
|
checkZero(t, agg, descriptor)
|
||||||
@ -156,8 +156,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
|
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
|
||||||
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
|
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))
|
||||||
|
|
||||||
checkZero(t, agg1, descriptor)
|
checkZero(t, agg1, descriptor)
|
||||||
checkZero(t, agg1, descriptor)
|
checkZero(t, agg1, descriptor)
|
||||||
|
@ -111,11 +111,11 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchronizedCopy saves the current state into oa and resets the current state to
|
// SynchronizedMove saves the current state into oa and resets the current state to
|
||||||
// the empty set. Since no locks are taken, there is a chance that
|
// the empty set. Since no locks are taken, there is a chance that
|
||||||
// the independent Sum, Count and Bucket Count are not consistent with each
|
// the independent Sum, Count and Bucket Count are not consistent with each
|
||||||
// other.
|
// other.
|
||||||
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
|
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
|
||||||
o, _ := oa.(*Aggregator)
|
o, _ := oa.(*Aggregator)
|
||||||
if o == nil {
|
if o == nil {
|
||||||
return aggregator.NewInconsistentAggregatorError(c, oa)
|
return aggregator.NewInconsistentAggregatorError(c, oa)
|
||||||
|
@ -121,7 +121,7 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) {
|
|||||||
test.CheckedUpdate(t, agg, x, descriptor)
|
test.CheckedUpdate(t, agg, x, descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
|
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||||
|
|
||||||
checkZero(t, agg, descriptor)
|
checkZero(t, agg, descriptor)
|
||||||
|
|
||||||
@ -184,8 +184,8 @@ func TestHistogramMerge(t *testing.T) {
|
|||||||
test.CheckedUpdate(t, agg2, x, descriptor)
|
test.CheckedUpdate(t, agg2, x, descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
|
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
|
||||||
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
|
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))
|
||||||
|
|
||||||
test.CheckedMerge(t, ckpt1, ckpt2, descriptor)
|
test.CheckedMerge(t, ckpt1, ckpt2, descriptor)
|
||||||
|
|
||||||
@ -223,7 +223,7 @@ func TestHistogramNotSet(t *testing.T) {
|
|||||||
|
|
||||||
agg, ckpt := new2(descriptor)
|
agg, ckpt := new2(descriptor)
|
||||||
|
|
||||||
err := agg.SynchronizedCopy(ckpt, descriptor)
|
err := agg.SynchronizedMove(ckpt, descriptor)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
checkZero(t, agg, descriptor)
|
checkZero(t, agg, descriptor)
|
||||||
|
@ -90,8 +90,8 @@ func (g *Aggregator) LastValue() (metric.Number, time.Time, error) {
|
|||||||
return gd.value.AsNumber(), gd.timestamp, nil
|
return gd.value.AsNumber(), gd.timestamp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchronizedCopy atomically saves the current value.
|
// SynchronizedMove atomically saves the current value.
|
||||||
func (g *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
|
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
|
||||||
o, _ := oa.(*Aggregator)
|
o, _ := oa.(*Aggregator)
|
||||||
if o == nil {
|
if o == nil {
|
||||||
return aggregator.NewInconsistentAggregatorError(g, oa)
|
return aggregator.NewInconsistentAggregatorError(g, oa)
|
||||||
|
@ -80,7 +80,7 @@ func TestLastValueUpdate(t *testing.T) {
|
|||||||
test.CheckedUpdate(t, agg, x, record)
|
test.CheckedUpdate(t, agg, x, record)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := agg.SynchronizedCopy(ckpt, record)
|
err := agg.SynchronizedMove(ckpt, record)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
lv, _, err := ckpt.LastValue()
|
lv, _, err := ckpt.LastValue()
|
||||||
@ -102,8 +102,8 @@ func TestLastValueMerge(t *testing.T) {
|
|||||||
test.CheckedUpdate(t, agg1, first1, descriptor)
|
test.CheckedUpdate(t, agg1, first1, descriptor)
|
||||||
test.CheckedUpdate(t, agg2, first2, descriptor)
|
test.CheckedUpdate(t, agg2, first2, descriptor)
|
||||||
|
|
||||||
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
|
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
|
||||||
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
|
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))
|
||||||
|
|
||||||
checkZero(t, agg1)
|
checkZero(t, agg1)
|
||||||
checkZero(t, agg2)
|
checkZero(t, agg2)
|
||||||
@ -127,7 +127,7 @@ func TestLastValueNotSet(t *testing.T) {
|
|||||||
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind)
|
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind)
|
||||||
|
|
||||||
g, ckpt := new2()
|
g, ckpt := new2()
|
||||||
require.NoError(t, g.SynchronizedCopy(ckpt, descriptor))
|
require.NoError(t, g.SynchronizedMove(ckpt, descriptor))
|
||||||
|
|
||||||
checkZero(t, g)
|
checkZero(t, g)
|
||||||
}
|
}
|
||||||
|
@ -48,7 +48,7 @@ var _ aggregation.MinMaxSumCount = &Aggregator{}
|
|||||||
// count. It does not compute quantile information other than Min and
|
// count. It does not compute quantile information other than Min and
|
||||||
// Max.
|
// Max.
|
||||||
//
|
//
|
||||||
// This type uses a mutex for Update() and SynchronizedCopy() concurrency.
|
// This type uses a mutex for Update() and SynchronizedMove() concurrency.
|
||||||
func New(cnt int, desc *metric.Descriptor) []Aggregator {
|
func New(cnt int, desc *metric.Descriptor) []Aggregator {
|
||||||
kind := desc.NumberKind()
|
kind := desc.NumberKind()
|
||||||
aggs := make([]Aggregator, cnt)
|
aggs := make([]Aggregator, cnt)
|
||||||
@ -101,9 +101,9 @@ func (c *Aggregator) Max() (metric.Number, error) {
|
|||||||
return c.max, nil
|
return c.max, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchronizedCopy saves the current state into oa and resets the current state to
|
// SynchronizedMove saves the current state into oa and resets the current state to
|
||||||
// the empty set.
|
// the empty set.
|
||||||
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
|
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error {
|
||||||
o, _ := oa.(*Aggregator)
|
o, _ := oa.(*Aggregator)
|
||||||
if o == nil {
|
if o == nil {
|
||||||
return aggregator.NewInconsistentAggregatorError(c, oa)
|
return aggregator.NewInconsistentAggregatorError(c, oa)
|
||||||
|
@ -120,7 +120,7 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
|
|||||||
test.CheckedUpdate(t, agg, x, descriptor)
|
test.CheckedUpdate(t, agg, x, descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
|
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||||
|
|
||||||
checkZero(t, agg, descriptor)
|
checkZero(t, agg, descriptor)
|
||||||
|
|
||||||
@ -173,8 +173,8 @@ func TestMinMaxSumCountMerge(t *testing.T) {
|
|||||||
test.CheckedUpdate(t, agg2, x, descriptor)
|
test.CheckedUpdate(t, agg2, x, descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
|
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
|
||||||
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
|
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))
|
||||||
|
|
||||||
checkZero(t, agg1, descriptor)
|
checkZero(t, agg1, descriptor)
|
||||||
checkZero(t, agg2, descriptor)
|
checkZero(t, agg2, descriptor)
|
||||||
@ -219,7 +219,7 @@ func TestMaxSumCountNotSet(t *testing.T) {
|
|||||||
alloc := New(2, descriptor)
|
alloc := New(2, descriptor)
|
||||||
agg, ckpt := &alloc[0], &alloc[1]
|
agg, ckpt := &alloc[0], &alloc[1]
|
||||||
|
|
||||||
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
|
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||||
|
|
||||||
asum, err := ckpt.Sum()
|
asum, err := ckpt.Sum()
|
||||||
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
|
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
|
||||||
|
@ -57,9 +57,9 @@ func (c *Aggregator) Sum() (metric.Number, error) {
|
|||||||
return c.value, nil
|
return c.value, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SynchronizedCopy atomically saves the current value into oa and resets the
|
// SynchronizedMove atomically saves the current value into oa and resets the
|
||||||
// current sum to zero.
|
// current sum to zero.
|
||||||
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
|
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error {
|
||||||
o, _ := oa.(*Aggregator)
|
o, _ := oa.(*Aggregator)
|
||||||
if o == nil {
|
if o == nil {
|
||||||
return aggregator.NewInconsistentAggregatorError(c, oa)
|
return aggregator.NewInconsistentAggregatorError(c, oa)
|
||||||
|
@ -74,7 +74,7 @@ func TestCounterSum(t *testing.T) {
|
|||||||
test.CheckedUpdate(t, agg, x, descriptor)
|
test.CheckedUpdate(t, agg, x, descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := agg.SynchronizedCopy(ckpt, descriptor)
|
err := agg.SynchronizedMove(ckpt, descriptor)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
checkZero(t, agg, descriptor)
|
checkZero(t, agg, descriptor)
|
||||||
@ -102,7 +102,7 @@ func TestValueRecorderSum(t *testing.T) {
|
|||||||
sum.AddNumber(profile.NumberKind, r2)
|
sum.AddNumber(profile.NumberKind, r2)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
|
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||||
checkZero(t, agg, descriptor)
|
checkZero(t, agg, descriptor)
|
||||||
|
|
||||||
asum, err := ckpt.Sum()
|
asum, err := ckpt.Sum()
|
||||||
@ -125,8 +125,8 @@ func TestCounterMerge(t *testing.T) {
|
|||||||
test.CheckedUpdate(t, agg2, x, descriptor)
|
test.CheckedUpdate(t, agg2, x, descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
|
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
|
||||||
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
|
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))
|
||||||
|
|
||||||
checkZero(t, agg1, descriptor)
|
checkZero(t, agg1, descriptor)
|
||||||
checkZero(t, agg2, descriptor)
|
checkZero(t, agg2, descriptor)
|
||||||
|
@ -48,7 +48,7 @@ func TestStressInt64Histogram(t *testing.T) {
|
|||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
for time.Since(startTime) < time.Second {
|
for time.Since(startTime) < time.Second {
|
||||||
require.NoError(t, h.SynchronizedCopy(ckpt, &desc))
|
require.NoError(t, h.SynchronizedMove(ckpt, &desc))
|
||||||
|
|
||||||
b, _ := ckpt.Histogram()
|
b, _ := ckpt.Histogram()
|
||||||
c, _ := ckpt.Count()
|
c, _ := ckpt.Count()
|
||||||
|
@ -220,7 +220,7 @@ func (b *Integrator) Process(accum export.Accumulation) error {
|
|||||||
// Accumulator's Aggregator into `value.delta` and sets
|
// Accumulator's Aggregator into `value.delta` and sets
|
||||||
// `value.current` appropriately to avoid this branch if
|
// `value.current` appropriately to avoid this branch if
|
||||||
// a third Accumulator is used.
|
// a third Accumulator is used.
|
||||||
err := value.current.SynchronizedCopy(value.delta, desc)
|
err := value.current.SynchronizedMove(value.delta, desc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -278,7 +278,7 @@ func (b *Integrator) FinishCollection() error {
|
|||||||
err = subt.Subtract(value.cumulative, value.delta, key.descriptor)
|
err = subt.Subtract(value.cumulative, value.delta, key.descriptor)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = value.current.SynchronizedCopy(value.cumulative, key.descriptor)
|
err = value.current.SynchronizedMove(value.cumulative, key.descriptor)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err = aggregation.ErrNoSubtraction
|
err = aggregation.ErrNoSubtraction
|
||||||
|
@ -47,7 +47,7 @@ func TestStressInt64MinMaxSumCount(t *testing.T) {
|
|||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
for time.Since(startTime) < time.Second {
|
for time.Since(startTime) < time.Second {
|
||||||
_ = mmsc.SynchronizedCopy(ckpt, &desc)
|
_ = mmsc.SynchronizedMove(ckpt, &desc)
|
||||||
|
|
||||||
s, _ := ckpt.Sum()
|
s, _ := ckpt.Sum()
|
||||||
c, _ := ckpt.Count()
|
c, _ := ckpt.Count()
|
||||||
|
@ -438,7 +438,7 @@ func (m *Accumulator) checkpointRecord(r *record) int {
|
|||||||
if r.current == nil {
|
if r.current == nil {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
err := r.current.SynchronizedCopy(r.checkpoint, &r.inst.descriptor)
|
err := r.current.SynchronizedMove(r.checkpoint, &r.inst.descriptor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
global.Handle(err)
|
global.Handle(err)
|
||||||
return 0
|
return 0
|
||||||
|
Reference in New Issue
Block a user