mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-03-03 14:52:56 +02:00
* Add tests for nonabsolute and varying sign values * Implement support for NonAbsolute Measurement MaxSumCount Previously, the MaxSumCount aggregator failed to work correctly with negative numbers (e.g. MeasureKind Alternate()==true). * Pass NumberKind to MaxSumCount New() function Allows it to set the initial state (current.max) to the correct value based on the NumberKind. * Revert extraneous local change * Pass full descriptor to msc New() This is analagous to the DDSketch New() constructor * Remember to run make precommit first * Add tests for empty checkpoint of MaxSumCount aggregator An empty checkpoint should have Sum() == 0, Count() == 0 and Max() still equal to the numberKind.Minimum() * Return ErrEmptyDataSet if no value set by the aggregator Remove TODO from stdout exporter to ensure that if a maxsumcount or ddsketch aggregator returns ErrEmptyDataSet from Max(), then the entire record will be skipped by the exporter. Added tests to ensure the exporter doesn't send any updates for EmptyDataSet checkpoints - for both ddsketch and maxsumcount. * Relayout Aggreggator struct to ensure int64s are 8-byte aligned On 32-bit architectures, Go only guarantees that primitive values are aligned to a 4 byte boundary. Atomic operations on 32-bit machines require 8-byte alignment. See https://github.com/golang/go/issues/599 * Addressing PR comments The use of Minimum() for the default uninitialized Maximum value means that in the unlikely condition that every recorded value for a measure is equal to the same NumberKind.Minimum(), then the aggregator's Max() will return ErrEmptyDataSet * Fix PR merge issue
144 lines
4.5 KiB
Go
144 lines
4.5 KiB
Go
// Copyright 2019, OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package maxsumcount // import "go.opentelemetry.io/otel/sdk/metric/aggregator/maxsumcount"
|
|
|
|
import (
|
|
"context"
|
|
|
|
"go.opentelemetry.io/otel/api/core"
|
|
export "go.opentelemetry.io/otel/sdk/export/metric"
|
|
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
|
)
|
|
|
|
type (
|
|
// Aggregator aggregates measure events, keeping only the max,
|
|
// sum, and count.
|
|
Aggregator struct {
|
|
current state
|
|
checkpoint state
|
|
kind core.NumberKind
|
|
}
|
|
|
|
state struct {
|
|
count core.Number
|
|
sum core.Number
|
|
max core.Number
|
|
}
|
|
)
|
|
|
|
// TODO: The SDK specification says this type should support Min
|
|
// values, see #319.
|
|
|
|
var _ export.Aggregator = &Aggregator{}
|
|
var _ aggregator.MaxSumCount = &Aggregator{}
|
|
|
|
// New returns a new measure aggregator for computing max, sum, and
|
|
// count. It does not compute quantile information other than Max.
|
|
//
|
|
// Note that this aggregator maintains each value using independent
|
|
// atomic operations, which introduces the possibility that
|
|
// checkpoints are inconsistent. For greater consistency and lower
|
|
// performance, consider using Array or DDSketch aggregators.
|
|
func New(desc *export.Descriptor) *Aggregator {
|
|
return &Aggregator{
|
|
kind: desc.NumberKind(),
|
|
current: unsetMaxSumCount(desc.NumberKind()),
|
|
}
|
|
}
|
|
|
|
func unsetMaxSumCount(kind core.NumberKind) state {
|
|
return state{max: kind.Minimum()}
|
|
}
|
|
|
|
// Sum returns the sum of values in the checkpoint.
|
|
func (c *Aggregator) Sum() (core.Number, error) {
|
|
return c.checkpoint.sum, nil
|
|
}
|
|
|
|
// Count returns the number of values in the checkpoint.
|
|
func (c *Aggregator) Count() (int64, error) {
|
|
return int64(c.checkpoint.count.AsUint64()), nil
|
|
}
|
|
|
|
// Max returns the maximum value in the checkpoint.
|
|
// The error value aggregator.ErrEmptyDataSet will be returned if
|
|
// (due to a race condition) the checkpoint was set prior to the
|
|
// current.max being computed in Update().
|
|
//
|
|
// Note: If a measure's recorded values for a given checkpoint are
|
|
// all equal to NumberKind.Minimum(), Max() will return ErrEmptyDataSet
|
|
func (c *Aggregator) Max() (core.Number, error) {
|
|
if c.checkpoint.max == c.kind.Minimum() {
|
|
return core.Number(0), aggregator.ErrEmptyDataSet
|
|
}
|
|
return c.checkpoint.max, nil
|
|
}
|
|
|
|
// Checkpoint saves the current state and resets the current state to
|
|
// the empty set. Since no locks are taken, there is a chance that
|
|
// the independent Max, Sum, and Count are not consistent with each
|
|
// other.
|
|
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
|
|
// N.B. There is no atomic operation that can update all three
|
|
// values at once without a memory allocation.
|
|
//
|
|
// This aggregator is intended to trade this correctness for
|
|
// speed.
|
|
//
|
|
// Therefore, atomically swap fields independently, knowing
|
|
// that individually the three parts of this aggregation could
|
|
// be spread across multiple collections in rare cases.
|
|
|
|
c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
|
|
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
|
|
c.checkpoint.max = c.current.max.SwapNumberAtomic(c.kind.Minimum())
|
|
}
|
|
|
|
// Update adds the recorded measurement to the current data set.
|
|
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
|
|
kind := desc.NumberKind()
|
|
|
|
c.current.count.AddUint64Atomic(1)
|
|
c.current.sum.AddNumberAtomic(kind, number)
|
|
|
|
for {
|
|
current := c.current.max.AsNumberAtomic()
|
|
|
|
if number.CompareNumber(kind, current) <= 0 {
|
|
break
|
|
}
|
|
if c.current.max.CompareAndSwapNumber(current, number) {
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Merge combines two data sets into one.
|
|
func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error {
|
|
o, _ := oa.(*Aggregator)
|
|
if o == nil {
|
|
return aggregator.NewInconsistentMergeError(c, oa)
|
|
}
|
|
|
|
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
|
|
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
|
|
|
|
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
|
|
c.checkpoint.max.SetNumber(o.checkpoint.max)
|
|
}
|
|
return nil
|
|
}
|