You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
Merge branch 'master' into master
This commit is contained in:
@@ -116,7 +116,7 @@ func NewInconsistentMergeError(a1, a2 export.Aggregator) error {
|
||||
// RangeTest is a commmon routine for testing for valid input values.
|
||||
// This rejects NaN values. This rejects negative values when the
|
||||
// metric instrument does not support negative values, including
|
||||
// monotonic counter metrics and absolute measure metrics.
|
||||
// monotonic counter metrics and absolute ValueRecorder metrics.
|
||||
func RangeTest(number metric.Number, descriptor *metric.Descriptor) error {
|
||||
numberKind := descriptor.NumberKind()
|
||||
|
||||
|
||||
@@ -86,8 +86,8 @@ func TestNaNTest(t *testing.T) {
|
||||
t.Run(nkind.String(), func(t *testing.T) {
|
||||
for _, mkind := range []metric.Kind{
|
||||
metric.CounterKind,
|
||||
metric.MeasureKind,
|
||||
metric.ObserverKind,
|
||||
metric.ValueRecorderKind,
|
||||
metric.ValueObserverKind,
|
||||
} {
|
||||
desc := metric.NewDescriptor(
|
||||
"name",
|
||||
|
||||
@@ -100,22 +100,16 @@ type AggregationSelector interface {
|
||||
}
|
||||
|
||||
// Aggregator implements a specific aggregation behavior, e.g., a
|
||||
// behavior to track a sequence of updates to a counter, a measure, or
|
||||
// an observer instrument. For the most part, counter semantics are
|
||||
// fixed and the provided implementation should be used. Measure and
|
||||
// observer metrics offer a wide range of potential tradeoffs and
|
||||
// several implementations are provided.
|
||||
//
|
||||
// Aggregators are meant to compute the change (i.e., delta) in state
|
||||
// from one checkpoint to the next, with the exception of LastValue
|
||||
// aggregators. LastValue aggregators are required to maintain the last
|
||||
// value across checkpoints.
|
||||
// behavior to track a sequence of updates to an instrument. Sum-only
|
||||
// instruments commonly use a simple Sum aggregator, but for the
|
||||
// distribution instruments (ValueRecorder, ValueObserver) there are a
|
||||
// number of possible aggregators with different cost and accuracy
|
||||
// tradeoffs.
|
||||
//
|
||||
// Note that any Aggregator may be attached to any instrument--this is
|
||||
// the result of the OpenTelemetry API/SDK separation. It is possible
|
||||
// to attach a counter aggregator to a Measure instrument (to compute
|
||||
// a simple sum) or a LastValue aggregator to a measure instrument (to
|
||||
// compute the last value).
|
||||
// to attach a Sum aggregator to a ValueRecorder instrument or a
|
||||
// MinMaxSumCount aggregator to a Counter instrument.
|
||||
type Aggregator interface {
|
||||
// Update receives a new measured value and incorporates it
|
||||
// into the aggregation. Update() calls may arrive
|
||||
|
||||
@@ -27,6 +27,8 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
// Aggregator aggregates events that form a distribution, keeping
|
||||
// an array with the exact set of values.
|
||||
Aggregator struct {
|
||||
// ckptSum needs to be aligned for 64-bit atomic operations.
|
||||
ckptSum metric.Number
|
||||
|
||||
@@ -50,7 +50,7 @@ type updateTest struct {
|
||||
}
|
||||
|
||||
func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := New()
|
||||
|
||||
@@ -118,7 +118,7 @@ type mergeTest struct {
|
||||
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg1 := New()
|
||||
agg2 := New()
|
||||
@@ -215,7 +215,7 @@ func TestArrayErrors(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
test.CheckedUpdate(t, agg, metric.Number(0), descriptor)
|
||||
|
||||
@@ -243,7 +243,7 @@ func TestArrayErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestArrayFloat64(t *testing.T) {
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, metric.Float64NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, metric.Float64NumberKind)
|
||||
|
||||
fpsf := func(sign int) []float64 {
|
||||
// Check behavior of a bunch of odd floating
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
// Config is an alias for the underlying DDSketch config object.
|
||||
type Config = sdk.Config
|
||||
|
||||
// Aggregator aggregates measure events.
|
||||
// Aggregator aggregates events into a distribution.
|
||||
type Aggregator struct {
|
||||
lock sync.Mutex
|
||||
cfg *Config
|
||||
|
||||
@@ -33,7 +33,7 @@ type updateTest struct {
|
||||
func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
agg := New(NewDefaultConfig(), descriptor)
|
||||
|
||||
all := test.NewNumbers(profile.NumberKind)
|
||||
@@ -92,7 +92,7 @@ type mergeTest struct {
|
||||
|
||||
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg1 := New(NewDefaultConfig(), descriptor)
|
||||
agg2 := New(NewDefaultConfig(), descriptor)
|
||||
|
||||
@@ -24,6 +24,11 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
)
|
||||
|
||||
// Note: This code uses a Mutex to govern access to the exclusive
|
||||
// aggregator state. This is in contrast to a lock-free approach
|
||||
// (as in the Go prometheus client) that was reverted here:
|
||||
// https://github.com/open-telemetry/opentelemetry-go/pull/669
|
||||
|
||||
type (
|
||||
// Aggregator observe events and counts them in pre-determined buckets.
|
||||
// It also calculates the sum and count of all events.
|
||||
@@ -39,10 +44,9 @@ type (
|
||||
// the sum and counts for all observed values and
|
||||
// the less than equal bucket count for the pre-determined boundaries.
|
||||
state struct {
|
||||
// all fields have to be aligned for 64-bit atomic operations.
|
||||
buckets aggregator.Buckets
|
||||
count metric.Number
|
||||
sum metric.Number
|
||||
bucketCounts []metric.Number
|
||||
count metric.Number
|
||||
sum metric.Number
|
||||
}
|
||||
)
|
||||
|
||||
@@ -51,7 +55,7 @@ var _ aggregator.Sum = &Aggregator{}
|
||||
var _ aggregator.Count = &Aggregator{}
|
||||
var _ aggregator.Histogram = &Aggregator{}
|
||||
|
||||
// New returns a new measure aggregator for computing Histograms.
|
||||
// New returns a new aggregator for computing Histograms.
|
||||
//
|
||||
// A Histogram observe events and counts them in pre-defined buckets.
|
||||
// And also provides the total sum and count of all observations.
|
||||
@@ -71,17 +75,12 @@ func New(desc *metric.Descriptor, boundaries []metric.Number) *Aggregator {
|
||||
sort.Sort(&sortedBoundaries)
|
||||
boundaries = sortedBoundaries.numbers
|
||||
|
||||
agg := Aggregator{
|
||||
return &Aggregator{
|
||||
kind: desc.NumberKind(),
|
||||
boundaries: boundaries,
|
||||
current: state{
|
||||
buckets: aggregator.Buckets{
|
||||
Boundaries: boundaries,
|
||||
Counts: make([]metric.Number, len(boundaries)+1),
|
||||
},
|
||||
},
|
||||
current: emptyState(boundaries),
|
||||
checkpoint: emptyState(boundaries),
|
||||
}
|
||||
return &agg
|
||||
}
|
||||
|
||||
// Sum returns the sum of all values in the checkpoint.
|
||||
@@ -102,7 +101,10 @@ func (c *Aggregator) Count() (int64, error) {
|
||||
func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
return c.checkpoint.buckets, nil
|
||||
return aggregator.Buckets{
|
||||
Boundaries: c.boundaries,
|
||||
Counts: c.checkpoint.bucketCounts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Checkpoint saves the current state and resets the current state to
|
||||
@@ -111,16 +113,13 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
|
||||
// other.
|
||||
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
|
||||
c.lock.Lock()
|
||||
c.checkpoint, c.current = c.current, c.emptyState()
|
||||
c.checkpoint, c.current = c.current, emptyState(c.boundaries)
|
||||
c.lock.Unlock()
|
||||
}
|
||||
|
||||
func (c *Aggregator) emptyState() state {
|
||||
func emptyState(boundaries []metric.Number) state {
|
||||
return state{
|
||||
buckets: aggregator.Buckets{
|
||||
Boundaries: c.boundaries,
|
||||
Counts: make([]metric.Number, len(c.boundaries)+1),
|
||||
},
|
||||
bucketCounts: make([]metric.Number, len(boundaries)+1),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,7 +140,7 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
|
||||
|
||||
c.current.count.AddInt64(1)
|
||||
c.current.sum.AddNumber(kind, number)
|
||||
c.current.buckets.Counts[bucketID].AddUint64(1)
|
||||
c.current.bucketCounts[bucketID].AddUint64(1)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -156,8 +155,8 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error
|
||||
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
|
||||
c.checkpoint.count.AddNumber(metric.Uint64NumberKind, o.checkpoint.count)
|
||||
|
||||
for i := 0; i < len(c.checkpoint.buckets.Counts); i++ {
|
||||
c.checkpoint.buckets.Counts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.buckets.Counts[i])
|
||||
for i := 0; i < len(c.checkpoint.bucketCounts); i++ {
|
||||
c.checkpoint.bucketCounts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.bucketCounts[i])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ func TestHistogramPositiveAndNegative(t *testing.T) {
|
||||
// Validates count, sum and buckets for a given profile and policy
|
||||
func histogram(t *testing.T, profile test.Profile, policy policy) {
|
||||
ctx := context.Background()
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := New(descriptor, boundaries[profile.NumberKind])
|
||||
|
||||
@@ -113,20 +113,33 @@ func histogram(t *testing.T, profile test.Profile, policy policy) {
|
||||
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
|
||||
require.Nil(t, err)
|
||||
|
||||
require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
|
||||
counts := calcBuckets(all.Points(), profile)
|
||||
for i, v := range counts {
|
||||
bCount := agg.checkpoint.buckets.Counts[i].AsUint64()
|
||||
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts)
|
||||
bCount := agg.checkpoint.bucketCounts[i].AsUint64()
|
||||
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.bucketCounts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistogramInitial(t *testing.T) {
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := New(descriptor, boundaries[profile.NumberKind])
|
||||
buckets, err := agg.Histogram()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(buckets.Counts), len(boundaries[profile.NumberKind])+1)
|
||||
require.Equal(t, len(buckets.Boundaries), len(boundaries[profile.NumberKind]))
|
||||
})
|
||||
}
|
||||
|
||||
func TestHistogramMerge(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg1 := New(descriptor, boundaries[profile.NumberKind])
|
||||
agg2 := New(descriptor, boundaries[profile.NumberKind])
|
||||
@@ -164,12 +177,12 @@ func TestHistogramMerge(t *testing.T) {
|
||||
require.Equal(t, all.Count(), count, "Same count - absolute")
|
||||
require.Nil(t, err)
|
||||
|
||||
require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
require.Equal(t, len(agg1.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
|
||||
counts := calcBuckets(all.Points(), profile)
|
||||
for i, v := range counts {
|
||||
bCount := agg1.checkpoint.buckets.Counts[i].AsUint64()
|
||||
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts)
|
||||
bCount := agg1.checkpoint.bucketCounts[i].AsUint64()
|
||||
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.bucketCounts)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -178,7 +191,7 @@ func TestHistogramNotSet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := New(descriptor, boundaries[profile.NumberKind])
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
@@ -191,8 +204,8 @@ func TestHistogramNotSet(t *testing.T) {
|
||||
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
|
||||
require.Nil(t, err)
|
||||
|
||||
require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
for i, bCount := range agg.checkpoint.buckets.Counts {
|
||||
require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
for i, bCount := range agg.checkpoint.bucketCounts {
|
||||
require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -55,7 +55,7 @@ func TestLastValueUpdate(t *testing.T) {
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
agg := New()
|
||||
|
||||
record := test.NewAggregatorTest(metric.ObserverKind, profile.NumberKind)
|
||||
record := test.NewAggregatorTest(metric.ValueObserverKind, profile.NumberKind)
|
||||
|
||||
var last metric.Number
|
||||
for i := 0; i < count; i++ {
|
||||
@@ -79,7 +79,7 @@ func TestLastValueMerge(t *testing.T) {
|
||||
agg1 := New()
|
||||
agg2 := New()
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.ObserverKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, profile.NumberKind)
|
||||
|
||||
first1 := profile.Random(+1)
|
||||
first2 := profile.Random(+1)
|
||||
@@ -107,7 +107,7 @@ func TestLastValueMerge(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLastValueNotSet(t *testing.T) {
|
||||
descriptor := test.NewAggregatorTest(metric.ObserverKind, metric.Int64NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
|
||||
g := New()
|
||||
g.Checkpoint(context.Background(), descriptor)
|
||||
|
||||
@@ -24,8 +24,8 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
// Aggregator aggregates measure events, keeping only the min, max,
|
||||
// sum, and count.
|
||||
// Aggregator aggregates events that form a distribution,
|
||||
// keeping only the min, max, sum, and count.
|
||||
Aggregator struct {
|
||||
lock sync.Mutex
|
||||
current state
|
||||
@@ -44,8 +44,9 @@ type (
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregator.MinMaxSumCount = &Aggregator{}
|
||||
|
||||
// New returns a new measure aggregator for computing min, max, sum, and
|
||||
// count. It does not compute quantile information other than Min and Max.
|
||||
// New returns a new aggregator for computing the min, max, sum, and
|
||||
// count. It does not compute quantile information other than Min and
|
||||
// Max.
|
||||
//
|
||||
// This type uses a mutex for Update() and Checkpoint() concurrency.
|
||||
func New(desc *metric.Descriptor) *Aggregator {
|
||||
|
||||
@@ -79,7 +79,7 @@ func TestMinMaxSumCountPositiveAndNegative(t *testing.T) {
|
||||
// Validates min, max, sum and count for a given profile and policy
|
||||
func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
|
||||
ctx := context.Background()
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := New(descriptor)
|
||||
|
||||
@@ -127,7 +127,7 @@ func TestMinMaxSumCountMerge(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg1 := New(descriptor)
|
||||
agg2 := New(descriptor)
|
||||
@@ -185,7 +185,7 @@ func TestMaxSumCountNotSet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := New(descriptor)
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
|
||||
@@ -71,13 +71,13 @@ func TestCounterSum(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestMeasureSum(t *testing.T) {
|
||||
func TestValueRecorderSum(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
agg := New()
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.MeasureKind, profile.NumberKind)
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
sum := metric.Number(0)
|
||||
|
||||
|
||||
@@ -311,7 +311,7 @@ func BenchmarkInt64LastValueAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
mea := fix.meter.NewInt64Measure("int64.lastvalue")
|
||||
mea := fix.meter.NewInt64ValueRecorder("int64.lastvalue")
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
@@ -324,7 +324,7 @@ func BenchmarkInt64LastValueHandleAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
mea := fix.meter.NewInt64Measure("int64.lastvalue")
|
||||
mea := fix.meter.NewInt64ValueRecorder("int64.lastvalue")
|
||||
handle := mea.Bind(labs...)
|
||||
|
||||
b.ResetTimer()
|
||||
@@ -338,7 +338,7 @@ func BenchmarkFloat64LastValueAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
mea := fix.meter.NewFloat64Measure("float64.lastvalue")
|
||||
mea := fix.meter.NewFloat64ValueRecorder("float64.lastvalue")
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
@@ -351,7 +351,7 @@ func BenchmarkFloat64LastValueHandleAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
mea := fix.meter.NewFloat64Measure("float64.lastvalue")
|
||||
mea := fix.meter.NewFloat64ValueRecorder("float64.lastvalue")
|
||||
handle := mea.Bind(labs...)
|
||||
|
||||
b.ResetTimer()
|
||||
@@ -361,13 +361,13 @@ func BenchmarkFloat64LastValueHandleAdd(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// Measures
|
||||
// ValueRecorders
|
||||
|
||||
func benchmarkInt64MeasureAdd(b *testing.B, name string) {
|
||||
func benchmarkInt64ValueRecorderAdd(b *testing.B, name string) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
mea := fix.meter.NewInt64Measure(name)
|
||||
mea := fix.meter.NewInt64ValueRecorder(name)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
@@ -376,11 +376,11 @@ func benchmarkInt64MeasureAdd(b *testing.B, name string) {
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkInt64MeasureHandleAdd(b *testing.B, name string) {
|
||||
func benchmarkInt64ValueRecorderHandleAdd(b *testing.B, name string) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
mea := fix.meter.NewInt64Measure(name)
|
||||
mea := fix.meter.NewInt64ValueRecorder(name)
|
||||
handle := mea.Bind(labs...)
|
||||
|
||||
b.ResetTimer()
|
||||
@@ -390,11 +390,11 @@ func benchmarkInt64MeasureHandleAdd(b *testing.B, name string) {
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkFloat64MeasureAdd(b *testing.B, name string) {
|
||||
func benchmarkFloat64ValueRecorderAdd(b *testing.B, name string) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
mea := fix.meter.NewFloat64Measure(name)
|
||||
mea := fix.meter.NewFloat64ValueRecorder(name)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
@@ -403,11 +403,11 @@ func benchmarkFloat64MeasureAdd(b *testing.B, name string) {
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkFloat64MeasureHandleAdd(b *testing.B, name string) {
|
||||
func benchmarkFloat64ValueRecorderHandleAdd(b *testing.B, name string) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
mea := fix.meter.NewFloat64Measure(name)
|
||||
mea := fix.meter.NewFloat64ValueRecorder(name)
|
||||
handle := mea.Bind(labs...)
|
||||
|
||||
b.ResetTimer()
|
||||
@@ -423,22 +423,22 @@ func BenchmarkObserverRegistration(b *testing.B) {
|
||||
fix := newFixture(b)
|
||||
names := make([]string, 0, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
names = append(names, fmt.Sprintf("test.observer.%d", i))
|
||||
names = append(names, fmt.Sprintf("test.valueobserver.%d", i))
|
||||
}
|
||||
cb := func(result metric.Int64ObserverResult) {}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
fix.meter.RegisterInt64Observer(names[i], cb)
|
||||
fix.meter.RegisterInt64ValueObserver(names[i], cb)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkObserverObservationInt64(b *testing.B) {
|
||||
func BenchmarkValueObserverObservationInt64(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
_ = fix.meter.RegisterInt64Observer("test.observer", func(result metric.Int64ObserverResult) {
|
||||
_ = fix.meter.RegisterInt64ValueObserver("test.valueobserver", func(result metric.Int64ObserverResult) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
result.Observe((int64)(i), labs...)
|
||||
}
|
||||
@@ -449,11 +449,11 @@ func BenchmarkObserverObservationInt64(b *testing.B) {
|
||||
fix.accumulator.Collect(ctx)
|
||||
}
|
||||
|
||||
func BenchmarkObserverObservationFloat64(b *testing.B) {
|
||||
func BenchmarkValueObserverObservationFloat64(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
_ = fix.meter.RegisterFloat64Observer("test.observer", func(result metric.Float64ObserverResult) {
|
||||
_ = fix.meter.RegisterFloat64ValueObserver("test.valueobserver", func(result metric.Float64ObserverResult) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
result.Observe((float64)(i), labs...)
|
||||
}
|
||||
@@ -467,55 +467,55 @@ func BenchmarkObserverObservationFloat64(b *testing.B) {
|
||||
// MaxSumCount
|
||||
|
||||
func BenchmarkInt64MaxSumCountAdd(b *testing.B) {
|
||||
benchmarkInt64MeasureAdd(b, "int64.minmaxsumcount")
|
||||
benchmarkInt64ValueRecorderAdd(b, "int64.minmaxsumcount")
|
||||
}
|
||||
|
||||
func BenchmarkInt64MaxSumCountHandleAdd(b *testing.B) {
|
||||
benchmarkInt64MeasureHandleAdd(b, "int64.minmaxsumcount")
|
||||
benchmarkInt64ValueRecorderHandleAdd(b, "int64.minmaxsumcount")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64MaxSumCountAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureAdd(b, "float64.minmaxsumcount")
|
||||
benchmarkFloat64ValueRecorderAdd(b, "float64.minmaxsumcount")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64MaxSumCountHandleAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureHandleAdd(b, "float64.minmaxsumcount")
|
||||
benchmarkFloat64ValueRecorderHandleAdd(b, "float64.minmaxsumcount")
|
||||
}
|
||||
|
||||
// DDSketch
|
||||
|
||||
func BenchmarkInt64DDSketchAdd(b *testing.B) {
|
||||
benchmarkInt64MeasureAdd(b, "int64.ddsketch")
|
||||
benchmarkInt64ValueRecorderAdd(b, "int64.ddsketch")
|
||||
}
|
||||
|
||||
func BenchmarkInt64DDSketchHandleAdd(b *testing.B) {
|
||||
benchmarkInt64MeasureHandleAdd(b, "int64.ddsketch")
|
||||
benchmarkInt64ValueRecorderHandleAdd(b, "int64.ddsketch")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64DDSketchAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureAdd(b, "float64.ddsketch")
|
||||
benchmarkFloat64ValueRecorderAdd(b, "float64.ddsketch")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64DDSketchHandleAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureHandleAdd(b, "float64.ddsketch")
|
||||
benchmarkFloat64ValueRecorderHandleAdd(b, "float64.ddsketch")
|
||||
}
|
||||
|
||||
// Array
|
||||
|
||||
func BenchmarkInt64ArrayAdd(b *testing.B) {
|
||||
benchmarkInt64MeasureAdd(b, "int64.array")
|
||||
benchmarkInt64ValueRecorderAdd(b, "int64.array")
|
||||
}
|
||||
|
||||
func BenchmarkInt64ArrayHandleAdd(b *testing.B) {
|
||||
benchmarkInt64MeasureHandleAdd(b, "int64.array")
|
||||
benchmarkInt64ValueRecorderHandleAdd(b, "int64.array")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64ArrayAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureAdd(b, "float64.array")
|
||||
benchmarkFloat64ValueRecorderAdd(b, "float64.array")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64ArrayHandleAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureHandleAdd(b, "float64.array")
|
||||
benchmarkFloat64ValueRecorderHandleAdd(b, "float64.array")
|
||||
}
|
||||
|
||||
// BatchRecord
|
||||
|
||||
@@ -32,8 +32,6 @@ type Controller struct {
|
||||
collectLock sync.Mutex
|
||||
accumulator *sdk.Accumulator
|
||||
resource *resource.Resource
|
||||
uniq metric.MeterImpl
|
||||
named map[string]metric.Meter
|
||||
errorHandler sdk.ErrorHandler
|
||||
integrator export.Integrator
|
||||
exporter export.Exporter
|
||||
@@ -42,10 +40,9 @@ type Controller struct {
|
||||
period time.Duration
|
||||
ticker Ticker
|
||||
clock Clock
|
||||
provider *registry.Provider
|
||||
}
|
||||
|
||||
var _ metric.Provider = &Controller{}
|
||||
|
||||
// Several types below are created to match "github.com/benbjohnson/clock"
|
||||
// so that it remains a test-only dependency.
|
||||
|
||||
@@ -83,8 +80,7 @@ func New(integrator export.Integrator, exporter export.Exporter, period time.Dur
|
||||
return &Controller{
|
||||
accumulator: impl,
|
||||
resource: c.Resource,
|
||||
uniq: registry.NewUniqueInstrumentMeterImpl(impl),
|
||||
named: map[string]metric.Meter{},
|
||||
provider: registry.NewProvider(impl),
|
||||
errorHandler: c.ErrorHandler,
|
||||
integrator: integrator,
|
||||
exporter: exporter,
|
||||
@@ -102,6 +98,8 @@ func (c *Controller) SetClock(clock Clock) {
|
||||
c.clock = clock
|
||||
}
|
||||
|
||||
// SetErrorHandler sets the handler for errors. If none has been set, the
|
||||
// SDK default error handler is used.
|
||||
func (c *Controller) SetErrorHandler(errorHandler sdk.ErrorHandler) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
@@ -109,19 +107,9 @@ func (c *Controller) SetErrorHandler(errorHandler sdk.ErrorHandler) {
|
||||
c.accumulator.SetErrorHandler(errorHandler)
|
||||
}
|
||||
|
||||
// Meter returns a named Meter, satisifying the metric.Provider
|
||||
// interface.
|
||||
func (c *Controller) Meter(name string) metric.Meter {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
if meter, ok := c.named[name]; ok {
|
||||
return meter
|
||||
}
|
||||
|
||||
meter := metric.WrapMeterImpl(c.uniq, name)
|
||||
c.named[name] = meter
|
||||
return meter
|
||||
// Provider returns a metric.Provider instance for this controller.
|
||||
func (c *Controller) Provider() metric.Provider {
|
||||
return c.provider
|
||||
}
|
||||
|
||||
// Start begins a ticker that periodically collects and exports
|
||||
|
||||
@@ -183,7 +183,7 @@ func TestPushTicker(t *testing.T) {
|
||||
fix := newFixture(t)
|
||||
|
||||
p := push.New(fix.integrator, fix.exporter, time.Second)
|
||||
meter := p.Meter("name")
|
||||
meter := p.Provider().Meter("name")
|
||||
|
||||
mock := mockClock{clock.NewMock()}
|
||||
p.SetClock(mock)
|
||||
@@ -280,7 +280,7 @@ func TestPushExportError(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
meter := p.Meter("name")
|
||||
meter := p.Provider().Meter("name")
|
||||
counter1 := metric.Must(meter).NewInt64Counter("counter1")
|
||||
counter2 := metric.Must(meter).NewInt64Counter("counter2")
|
||||
|
||||
|
||||
+30
-30
@@ -107,7 +107,7 @@ func TestInputRangeTestCounter(t *testing.T) {
|
||||
require.Nil(t, sdkErr)
|
||||
}
|
||||
|
||||
func TestInputRangeTestMeasure(t *testing.T) {
|
||||
func TestInputRangeTestValueRecorder(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
integrator := &correctnessIntegrator{
|
||||
t: t,
|
||||
@@ -120,17 +120,17 @@ func TestInputRangeTestMeasure(t *testing.T) {
|
||||
sdkErr = handleErr
|
||||
})
|
||||
|
||||
measure := Must(meter).NewFloat64Measure("name.measure")
|
||||
valuerecorder := Must(meter).NewFloat64ValueRecorder("name.valuerecorder")
|
||||
|
||||
measure.Record(ctx, math.NaN())
|
||||
valuerecorder.Record(ctx, math.NaN())
|
||||
require.Equal(t, aggregator.ErrNaNInput, sdkErr)
|
||||
sdkErr = nil
|
||||
|
||||
checkpointed := sdk.Collect(ctx)
|
||||
require.Equal(t, 0, checkpointed)
|
||||
|
||||
measure.Record(ctx, 1)
|
||||
measure.Record(ctx, 2)
|
||||
valuerecorder.Record(ctx, 1)
|
||||
valuerecorder.Record(ctx, 2)
|
||||
|
||||
integrator.records = nil
|
||||
checkpointed = sdk.Collect(ctx)
|
||||
@@ -150,9 +150,9 @@ func TestDisabledInstrument(t *testing.T) {
|
||||
sdk := metricsdk.NewAccumulator(integrator)
|
||||
meter := metric.WrapMeterImpl(sdk, "test")
|
||||
|
||||
measure := Must(meter).NewFloat64Measure("name.disabled")
|
||||
valuerecorder := Must(meter).NewFloat64ValueRecorder("name.disabled")
|
||||
|
||||
measure.Record(ctx, -1)
|
||||
valuerecorder.Record(ctx, -1)
|
||||
checkpointed := sdk.Collect(ctx)
|
||||
|
||||
require.Equal(t, 0, checkpointed)
|
||||
@@ -291,20 +291,20 @@ func TestObserverCollection(t *testing.T) {
|
||||
sdk := metricsdk.NewAccumulator(integrator)
|
||||
meter := metric.WrapMeterImpl(sdk, "test")
|
||||
|
||||
_ = Must(meter).RegisterFloat64Observer("float.observer", func(result metric.Float64ObserverResult) {
|
||||
_ = Must(meter).RegisterFloat64ValueObserver("float.valueobserver", func(result metric.Float64ObserverResult) {
|
||||
result.Observe(1, kv.String("A", "B"))
|
||||
// last value wins
|
||||
result.Observe(-1, kv.String("A", "B"))
|
||||
result.Observe(-1, kv.String("C", "D"))
|
||||
})
|
||||
_ = Must(meter).RegisterInt64Observer("int.observer", func(result metric.Int64ObserverResult) {
|
||||
_ = Must(meter).RegisterInt64ValueObserver("int.valueobserver", func(result metric.Int64ObserverResult) {
|
||||
result.Observe(-1, kv.String("A", "B"))
|
||||
result.Observe(1)
|
||||
// last value wins
|
||||
result.Observe(1, kv.String("A", "B"))
|
||||
result.Observe(1)
|
||||
})
|
||||
_ = Must(meter).RegisterInt64Observer("empty.observer", func(result metric.Int64ObserverResult) {
|
||||
_ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(result metric.Int64ObserverResult) {
|
||||
})
|
||||
|
||||
collected := sdk.Collect(ctx)
|
||||
@@ -317,10 +317,10 @@ func TestObserverCollection(t *testing.T) {
|
||||
_ = out.AddTo(rec)
|
||||
}
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"float.observer/A=B": -1,
|
||||
"float.observer/C=D": -1,
|
||||
"int.observer/": 1,
|
||||
"int.observer/A=B": 1,
|
||||
"float.valueobserver/A=B": -1,
|
||||
"float.valueobserver/C=D": -1,
|
||||
"int.valueobserver/": 1,
|
||||
"int.valueobserver/A=B": 1,
|
||||
}, out.Map)
|
||||
}
|
||||
|
||||
@@ -333,8 +333,8 @@ func TestObserverBatch(t *testing.T) {
|
||||
sdk := metricsdk.NewAccumulator(integrator)
|
||||
meter := metric.WrapMeterImpl(sdk, "test")
|
||||
|
||||
var floatObs metric.Float64Observer
|
||||
var intObs metric.Int64Observer
|
||||
var floatObs metric.Float64ValueObserver
|
||||
var intObs metric.Int64ValueObserver
|
||||
var batch = Must(meter).NewBatchObserver(
|
||||
func(result metric.BatchObserverResult) {
|
||||
result.Observe(
|
||||
@@ -358,8 +358,8 @@ func TestObserverBatch(t *testing.T) {
|
||||
intObs.Observation(1),
|
||||
)
|
||||
})
|
||||
floatObs = batch.RegisterFloat64Observer("float.observer")
|
||||
intObs = batch.RegisterInt64Observer("int.observer")
|
||||
floatObs = batch.RegisterFloat64ValueObserver("float.valueobserver")
|
||||
intObs = batch.RegisterInt64ValueObserver("int.valueobserver")
|
||||
|
||||
collected := sdk.Collect(ctx)
|
||||
|
||||
@@ -371,10 +371,10 @@ func TestObserverBatch(t *testing.T) {
|
||||
_ = out.AddTo(rec)
|
||||
}
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"float.observer/A=B": -1,
|
||||
"float.observer/C=D": -1,
|
||||
"int.observer/": 1,
|
||||
"int.observer/A=B": 1,
|
||||
"float.valueobserver/A=B": -1,
|
||||
"float.valueobserver/C=D": -1,
|
||||
"int.valueobserver/": 1,
|
||||
"int.valueobserver/A=B": 1,
|
||||
}, out.Map)
|
||||
}
|
||||
|
||||
@@ -389,8 +389,8 @@ func TestRecordBatch(t *testing.T) {
|
||||
|
||||
counter1 := Must(meter).NewInt64Counter("int64.counter")
|
||||
counter2 := Must(meter).NewFloat64Counter("float64.counter")
|
||||
measure1 := Must(meter).NewInt64Measure("int64.measure")
|
||||
measure2 := Must(meter).NewFloat64Measure("float64.measure")
|
||||
valuerecorder1 := Must(meter).NewInt64ValueRecorder("int64.valuerecorder")
|
||||
valuerecorder2 := Must(meter).NewFloat64ValueRecorder("float64.valuerecorder")
|
||||
|
||||
sdk.RecordBatch(
|
||||
ctx,
|
||||
@@ -400,8 +400,8 @@ func TestRecordBatch(t *testing.T) {
|
||||
},
|
||||
counter1.Measurement(1),
|
||||
counter2.Measurement(2),
|
||||
measure1.Measurement(3),
|
||||
measure2.Measurement(4),
|
||||
valuerecorder1.Measurement(3),
|
||||
valuerecorder2.Measurement(4),
|
||||
)
|
||||
|
||||
sdk.Collect(ctx)
|
||||
@@ -411,10 +411,10 @@ func TestRecordBatch(t *testing.T) {
|
||||
_ = out.AddTo(rec)
|
||||
}
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"int64.counter/A=B,C=D": 1,
|
||||
"float64.counter/A=B,C=D": 2,
|
||||
"int64.measure/A=B,C=D": 3,
|
||||
"float64.measure/A=B,C=D": 4,
|
||||
"int64.counter/A=B,C=D": 1,
|
||||
"float64.counter/A=B,C=D": 2,
|
||||
"int64.valuerecorder/A=B,C=D": 3,
|
||||
"float64.valuerecorder/A=B,C=D": 4,
|
||||
}, out.Map)
|
||||
}
|
||||
|
||||
|
||||
+24
-55
@@ -13,57 +13,34 @@
|
||||
// limitations under the License.
|
||||
|
||||
/*
|
||||
Package metric implements the OpenTelemetry metric.Meter API. The SDK
|
||||
supports configurable metrics export behavior through a collection of
|
||||
export interfaces that support various export strategies, described below.
|
||||
Package metric implements the OpenTelemetry metric.MeterImpl
|
||||
interface. The Accumulator type supports configurable metrics export
|
||||
behavior through a collection of export interfaces that support
|
||||
various export strategies, described below.
|
||||
|
||||
The metric.Meter API consists of methods for constructing each of the basic
|
||||
kinds of metric instrument. There are six types of instrument available to
|
||||
the end user, comprised of three basic kinds of metric instrument (Counter,
|
||||
Measure, Observer) crossed with two kinds of number (int64, float64).
|
||||
The metric.MeterImpl API consists of methods for constructing
|
||||
synchronous and asynchronous instruments. There are two constructors
|
||||
per instrument for the two kinds of number (int64, float64).
|
||||
|
||||
The API assists the SDK by consolidating the variety of metric instruments
|
||||
into a narrower interface, allowing the SDK to avoid repetition of
|
||||
boilerplate. The API and SDK are separated such that an event reaching
|
||||
the SDK has a uniform structure: an instrument, a label set, and a
|
||||
numerical value.
|
||||
Synchronous instruments are managed by a sync.Map containing a *record
|
||||
with the current state for each synchronous instrument. A bound
|
||||
instrument encapsulates a direct pointer to the record, allowing
|
||||
bound metric events to bypass a sync.Map lookup. A lock-free
|
||||
algorithm is used to protect against races when adding and removing
|
||||
items from the sync.Map.
|
||||
|
||||
To this end, the API uses a kv.Number type to represent either an int64
|
||||
or a float64, depending on the instrument's definition. A single
|
||||
implementation interface is used for counter and measure instruments,
|
||||
metric.InstrumentImpl, and a single implementation interface is used for
|
||||
their handles, metric.HandleImpl. For observers, the API defines
|
||||
interfaces, for which the SDK provides an implementation.
|
||||
|
||||
There are four entry points for events in the Metrics API - three for
|
||||
synchronous instruments (counters and measures) and one for asynchronous
|
||||
instruments (observers). The entry points for synchronous instruments are:
|
||||
via instrument handles, via direct instrument calls, and via BatchRecord.
|
||||
The SDK is designed with handles as the primary entry point, the other two
|
||||
entry points are implemented in terms of short-lived handles. For example,
|
||||
the implementation of a direct call allocates a handle, operates on the
|
||||
handle, and releases the handle. Similarly, the implementation of
|
||||
RecordBatch uses a short-lived handle for each measurement in the batch.
|
||||
The entry point for asynchronous instruments is via observer callbacks.
|
||||
Observer callbacks behave like a set of instrument handles - one for each
|
||||
observation for a distinct label set. The observer handles are alive as
|
||||
long as they are used. If the callback stops reporting values for a
|
||||
certain label set, the associated handle is dropped.
|
||||
Asynchronous instruments are managed by an internal
|
||||
AsyncInstrumentState, which coordinates calling batch and single
|
||||
instrument callbacks.
|
||||
|
||||
Internal Structure
|
||||
|
||||
The SDK is designed with minimal use of locking, to avoid adding
|
||||
contention for user-level code. For each handle, whether it is held by
|
||||
user-level code or a short-lived device, there exists an internal record
|
||||
managed by the SDK. Each internal record corresponds to a specific
|
||||
instrument and label set combination.
|
||||
|
||||
Each observer also has its own kind of record stored in the SDK. This
|
||||
record contains a set of recorders for every specific label set used in the
|
||||
callback.
|
||||
|
||||
A sync.Map maintains the mapping of current instruments and label sets to
|
||||
internal records. To create a new handle, the SDK consults the Map to
|
||||
internal records. To create a new bound instrument, the SDK consults the Map to
|
||||
locate an existing record, otherwise it constructs a new record. The SDK
|
||||
maintains a count of the number of references to each record, ensuring
|
||||
that records are not reclaimed from the Map while they are still active
|
||||
@@ -74,12 +51,7 @@ sweeps through all records in the SDK, checkpointing their state. When a
|
||||
record is discovered that has no references and has not been updated since
|
||||
the prior collection pass, it is removed from the Map.
|
||||
|
||||
The SDK maintains a current epoch number, corresponding to the number of
|
||||
completed collections. Each recorder of an observer record contains the
|
||||
last epoch during which it was updated. This variable allows the collection
|
||||
code path to detect stale recorders and remove them.
|
||||
|
||||
Each record of a handle and recorder of an observer has an associated
|
||||
Both synchronous and asynchronous instruments have an associated
|
||||
aggregator, which maintains the current state resulting from all metric
|
||||
events since its last checkpoint. Aggregators may be lock-free or they may
|
||||
use locking, but they should expect to be called concurrently. Aggregators
|
||||
@@ -97,21 +69,18 @@ enters the SDK resulting in a new record, and collection context,
|
||||
where a system-level thread performs a collection pass through the
|
||||
SDK.
|
||||
|
||||
Descriptor is a struct that describes the metric instrument to the export
|
||||
pipeline, containing the name, recommended aggregation keys, units,
|
||||
description, metric kind (counter or measure), number kind (int64 or
|
||||
float64), and whether the instrument has alternate semantics or not (i.e.,
|
||||
monotonic=false counter, absolute=false measure). A Descriptor accompanies
|
||||
metric data as it passes through the export pipeline.
|
||||
Descriptor is a struct that describes the metric instrument to the
|
||||
export pipeline, containing the name, units, description, metric kind,
|
||||
number kind (int64 or float64). A Descriptor accompanies metric data
|
||||
as it passes through the export pipeline.
|
||||
|
||||
The AggregationSelector interface supports choosing the method of
|
||||
aggregation to apply to a particular instrument. Given the Descriptor,
|
||||
this AggregatorFor method returns an implementation of Aggregator. If this
|
||||
interface returns nil, the metric will be disabled. The aggregator should
|
||||
be matched to the capabilities of the exporter. Selecting the aggregator
|
||||
for counter instruments is relatively straightforward, but for measure and
|
||||
observer instruments there are numerous choices with different cost and
|
||||
quality tradeoffs.
|
||||
for sum-only instruments is relatively straightforward, but many options
|
||||
are available for aggregating distributions from ValueRecorder instruments.
|
||||
|
||||
Aggregator is an interface which implements a concrete strategy for
|
||||
aggregating metric updates. Several Aggregator implementations are
|
||||
|
||||
@@ -38,7 +38,7 @@ func ExampleNew() {
|
||||
ctx := context.Background()
|
||||
|
||||
key := kv.Key("key")
|
||||
meter := pusher.Meter("example")
|
||||
meter := pusher.Provider().Meter("example")
|
||||
|
||||
counter := metric.Must(meter).NewInt64Counter("a.counter")
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
)
|
||||
|
||||
func TestStressInt64Histogram(t *testing.T) {
|
||||
desc := metric.NewDescriptor("some_metric", metric.MeasureKind, metric.Int64NumberKind)
|
||||
desc := metric.NewDescriptor("some_metric", metric.ValueRecorderKind, metric.Int64NumberKind)
|
||||
h := histogram.New(&desc, []metric.Number{metric.NewInt64Number(25), metric.NewInt64Number(50), metric.NewInt64Number(75)})
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
|
||||
@@ -47,9 +47,9 @@ type (
|
||||
var (
|
||||
// LastValueADesc and LastValueBDesc group by "G"
|
||||
LastValueADesc = metric.NewDescriptor(
|
||||
"lastvalue.a", metric.ObserverKind, metric.Int64NumberKind)
|
||||
"lastvalue.a", metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
LastValueBDesc = metric.NewDescriptor(
|
||||
"lastvalue.b", metric.ObserverKind, metric.Int64NumberKind)
|
||||
"lastvalue.b", metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
// CounterADesc and CounterBDesc group by "C"
|
||||
CounterADesc = metric.NewDescriptor(
|
||||
"sum.a", metric.CounterKind, metric.Int64NumberKind)
|
||||
@@ -92,7 +92,7 @@ func (*testAggregationSelector) AggregatorFor(desc *metric.Descriptor) export.Ag
|
||||
switch desc.MetricKind() {
|
||||
case metric.CounterKind:
|
||||
return sum.New()
|
||||
case metric.ObserverKind:
|
||||
case metric.ValueObserverKind:
|
||||
return lastvalue.New()
|
||||
default:
|
||||
panic("Invalid descriptor MetricKind for this test")
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
)
|
||||
|
||||
func TestStressInt64MinMaxSumCount(t *testing.T) {
|
||||
desc := metric.NewDescriptor("some_metric", metric.MeasureKind, metric.Int64NumberKind)
|
||||
desc := metric.NewDescriptor("some_metric", metric.ValueRecorderKind, metric.Int64NumberKind)
|
||||
mmsc := minmaxsumcount.New(&desc)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
@@ -42,48 +42,48 @@ var (
|
||||
_ export.AggregationSelector = selectorHistogram{}
|
||||
)
|
||||
|
||||
// NewWithInexpensiveMeasure returns a simple aggregation selector
|
||||
// NewWithInexpensiveDistribution returns a simple aggregation selector
|
||||
// that uses counter, minmaxsumcount and minmaxsumcount aggregators
|
||||
// for the three kinds of metric. This selector is faster and uses
|
||||
// less memory than the others because minmaxsumcount does not
|
||||
// aggregate quantile information.
|
||||
func NewWithInexpensiveMeasure() export.AggregationSelector {
|
||||
func NewWithInexpensiveDistribution() export.AggregationSelector {
|
||||
return selectorInexpensive{}
|
||||
}
|
||||
|
||||
// NewWithSketchMeasure returns a simple aggregation selector that
|
||||
// NewWithSketchDistribution returns a simple aggregation selector that
|
||||
// uses counter, ddsketch, and ddsketch aggregators for the three
|
||||
// kinds of metric. This selector uses more cpu and memory than the
|
||||
// NewWithInexpensiveMeasure because it uses one DDSketch per distinct
|
||||
// measure/observer and labelset.
|
||||
func NewWithSketchMeasure(config *ddsketch.Config) export.AggregationSelector {
|
||||
// NewWithInexpensiveDistribution because it uses one DDSketch per distinct
|
||||
// instrument and label set.
|
||||
func NewWithSketchDistribution(config *ddsketch.Config) export.AggregationSelector {
|
||||
return selectorSketch{
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// NewWithExactMeasure returns a simple aggregation selector that uses
|
||||
// NewWithExactDistribution returns a simple aggregation selector that uses
|
||||
// counter, array, and array aggregators for the three kinds of metric.
|
||||
// This selector uses more memory than the NewWithSketchMeasure
|
||||
// This selector uses more memory than the NewWithSketchDistribution
|
||||
// because it aggregates an array of all values, therefore is able to
|
||||
// compute exact quantiles.
|
||||
func NewWithExactMeasure() export.AggregationSelector {
|
||||
func NewWithExactDistribution() export.AggregationSelector {
|
||||
return selectorExact{}
|
||||
}
|
||||
|
||||
// NewWithHistogramMeasure returns a simple aggregation selector that uses counter,
|
||||
// NewWithHistogramDistribution returns a simple aggregation selector that uses counter,
|
||||
// histogram, and histogram aggregators for the three kinds of metric. This
|
||||
// selector uses more memory than the NewWithInexpensiveMeasure because it
|
||||
// selector uses more memory than the NewWithInexpensiveDistribution because it
|
||||
// uses a counter per bucket.
|
||||
func NewWithHistogramMeasure(boundaries []metric.Number) export.AggregationSelector {
|
||||
func NewWithHistogramDistribution(boundaries []metric.Number) export.AggregationSelector {
|
||||
return selectorHistogram{boundaries: boundaries}
|
||||
}
|
||||
|
||||
func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
|
||||
switch descriptor.MetricKind() {
|
||||
case metric.ObserverKind:
|
||||
case metric.ValueObserverKind:
|
||||
fallthrough
|
||||
case metric.MeasureKind:
|
||||
case metric.ValueRecorderKind:
|
||||
return minmaxsumcount.New(descriptor)
|
||||
default:
|
||||
return sum.New()
|
||||
@@ -92,9 +92,9 @@ func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.A
|
||||
|
||||
func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
|
||||
switch descriptor.MetricKind() {
|
||||
case metric.ObserverKind:
|
||||
case metric.ValueObserverKind:
|
||||
fallthrough
|
||||
case metric.MeasureKind:
|
||||
case metric.ValueRecorderKind:
|
||||
return ddsketch.New(s.config, descriptor)
|
||||
default:
|
||||
return sum.New()
|
||||
@@ -103,9 +103,9 @@ func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor) export.Aggr
|
||||
|
||||
func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
|
||||
switch descriptor.MetricKind() {
|
||||
case metric.ObserverKind:
|
||||
case metric.ValueObserverKind:
|
||||
fallthrough
|
||||
case metric.MeasureKind:
|
||||
case metric.ValueRecorderKind:
|
||||
return array.New()
|
||||
default:
|
||||
return sum.New()
|
||||
@@ -114,9 +114,9 @@ func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega
|
||||
|
||||
func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
|
||||
switch descriptor.MetricKind() {
|
||||
case metric.ObserverKind:
|
||||
case metric.ValueObserverKind:
|
||||
fallthrough
|
||||
case metric.MeasureKind:
|
||||
case metric.ValueRecorderKind:
|
||||
return histogram.New(descriptor, s.boundaries)
|
||||
default:
|
||||
return sum.New()
|
||||
|
||||
@@ -29,35 +29,35 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
testCounterDesc = metric.NewDescriptor("counter", metric.CounterKind, metric.Int64NumberKind)
|
||||
testMeasureDesc = metric.NewDescriptor("measure", metric.MeasureKind, metric.Int64NumberKind)
|
||||
testObserverDesc = metric.NewDescriptor("observer", metric.ObserverKind, metric.Int64NumberKind)
|
||||
testCounterDesc = metric.NewDescriptor("counter", metric.CounterKind, metric.Int64NumberKind)
|
||||
testValueRecorderDesc = metric.NewDescriptor("valuerecorder", metric.ValueRecorderKind, metric.Int64NumberKind)
|
||||
testValueObserverDesc = metric.NewDescriptor("valueobserver", metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
)
|
||||
|
||||
func TestInexpensiveMeasure(t *testing.T) {
|
||||
inex := simple.NewWithInexpensiveMeasure()
|
||||
func TestInexpensiveDistribution(t *testing.T) {
|
||||
inex := simple.NewWithInexpensiveDistribution()
|
||||
require.NotPanics(t, func() { _ = inex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = inex.AggregatorFor(&testMeasureDesc).(*minmaxsumcount.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = inex.AggregatorFor(&testObserverDesc).(*minmaxsumcount.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = inex.AggregatorFor(&testValueRecorderDesc).(*minmaxsumcount.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = inex.AggregatorFor(&testValueObserverDesc).(*minmaxsumcount.Aggregator) })
|
||||
}
|
||||
|
||||
func TestSketchMeasure(t *testing.T) {
|
||||
sk := simple.NewWithSketchMeasure(ddsketch.NewDefaultConfig())
|
||||
func TestSketchDistribution(t *testing.T) {
|
||||
sk := simple.NewWithSketchDistribution(ddsketch.NewDefaultConfig())
|
||||
require.NotPanics(t, func() { _ = sk.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = sk.AggregatorFor(&testMeasureDesc).(*ddsketch.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = sk.AggregatorFor(&testObserverDesc).(*ddsketch.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = sk.AggregatorFor(&testValueRecorderDesc).(*ddsketch.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = sk.AggregatorFor(&testValueObserverDesc).(*ddsketch.Aggregator) })
|
||||
}
|
||||
|
||||
func TestExactMeasure(t *testing.T) {
|
||||
ex := simple.NewWithExactMeasure()
|
||||
func TestExactDistribution(t *testing.T) {
|
||||
ex := simple.NewWithExactDistribution()
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testMeasureDesc).(*array.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testObserverDesc).(*array.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueRecorderDesc).(*array.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueObserverDesc).(*array.Aggregator) })
|
||||
}
|
||||
|
||||
func TestHistogramMeasure(t *testing.T) {
|
||||
ex := simple.NewWithHistogramMeasure([]metric.Number{})
|
||||
func TestHistogramDistribution(t *testing.T) {
|
||||
ex := simple.NewWithHistogramDistribution([]metric.Number{})
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testMeasureDesc).(*histogram.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testObserverDesc).(*histogram.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueRecorderDesc).(*histogram.Aggregator) })
|
||||
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueObserverDesc).(*histogram.Aggregator) })
|
||||
}
|
||||
|
||||
@@ -285,7 +285,7 @@ func (f *testFixture) Process(_ context.Context, record export.Record) error {
|
||||
f.T.Fatal("Sum error: ", err)
|
||||
}
|
||||
f.impl.storeCollect(actual, sum, time.Time{})
|
||||
case metric.MeasureKind:
|
||||
case metric.ValueRecorderKind:
|
||||
lv, ts, err := agg.(aggregator.LastValue).LastValue()
|
||||
if err != nil && err != aggregator.ErrNoData {
|
||||
f.T.Fatal("Last value error: ", err)
|
||||
@@ -431,15 +431,15 @@ func TestStressFloat64Counter(t *testing.T) {
|
||||
func intLastValueTestImpl() testImpl {
|
||||
return testImpl{
|
||||
newInstrument: func(meter api.Meter, name string) SyncImpler {
|
||||
return Must(meter).NewInt64Measure(name + ".lastvalue")
|
||||
return Must(meter).NewInt64ValueRecorder(name + ".lastvalue")
|
||||
},
|
||||
getUpdateValue: func() api.Number {
|
||||
r1 := rand.Int63()
|
||||
return api.NewInt64Number(rand.Int63() - r1)
|
||||
},
|
||||
operate: func(inst interface{}, ctx context.Context, value api.Number, labels []kv.KeyValue) {
|
||||
measure := inst.(api.Int64Measure)
|
||||
measure.Record(ctx, value.AsInt64(), labels...)
|
||||
valuerecorder := inst.(api.Int64ValueRecorder)
|
||||
valuerecorder.Record(ctx, value.AsInt64(), labels...)
|
||||
},
|
||||
newStore: func() interface{} {
|
||||
return &lastValueState{
|
||||
@@ -473,14 +473,14 @@ func TestStressInt64LastValue(t *testing.T) {
|
||||
func floatLastValueTestImpl() testImpl {
|
||||
return testImpl{
|
||||
newInstrument: func(meter api.Meter, name string) SyncImpler {
|
||||
return Must(meter).NewFloat64Measure(name + ".lastvalue")
|
||||
return Must(meter).NewFloat64ValueRecorder(name + ".lastvalue")
|
||||
},
|
||||
getUpdateValue: func() api.Number {
|
||||
return api.NewFloat64Number((-0.5 + rand.Float64()) * 100000)
|
||||
},
|
||||
operate: func(inst interface{}, ctx context.Context, value api.Number, labels []kv.KeyValue) {
|
||||
measure := inst.(api.Float64Measure)
|
||||
measure.Record(ctx, value.AsFloat64(), labels...)
|
||||
valuerecorder := inst.(api.Float64ValueRecorder)
|
||||
valuerecorder.Record(ctx, value.AsFloat64(), labels...)
|
||||
},
|
||||
newStore: func() interface{} {
|
||||
return &lastValueState{
|
||||
|
||||
@@ -1,80 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package resourcekeys contains well known type and label keys for resources.
|
||||
package resourcekeys // import "go.opentelemetry.io/otel/sdk/resource/resourcekeys"
|
||||
|
||||
// Constants for Service resources.
|
||||
const (
|
||||
// A uniquely identifying name for a Service.
|
||||
ServiceKeyName = "service.name"
|
||||
ServiceKeyNamespace = "service.namespace"
|
||||
ServiceKeyInstanceID = "service.instance.id"
|
||||
ServiceKeyVersion = "service.version"
|
||||
)
|
||||
|
||||
// Constants for Library resources.
|
||||
const (
|
||||
// A uniquely identifying name for a Library.
|
||||
LibraryKeyName = "library.name"
|
||||
LibraryKeyLanguage = "library.language"
|
||||
LibraryKeyVersion = "library.version"
|
||||
)
|
||||
|
||||
// Constants for Kubernetes resources.
|
||||
const (
|
||||
// A uniquely identifying name for the Kubernetes cluster. Kubernetes
|
||||
// does not have cluster names as an internal concept so this may be
|
||||
// set to any meaningful value within the environment. For example,
|
||||
// GKE clusters have a name which can be used for this label.
|
||||
K8SKeyClusterName = "k8s.cluster.name"
|
||||
K8SKeyNamespaceName = "k8s.namespace.name"
|
||||
K8SKeyPodName = "k8s.pod.name"
|
||||
K8SKeyDeploymentName = "k8s.deployment.name"
|
||||
)
|
||||
|
||||
// Constants for Container resources.
|
||||
const (
|
||||
// A uniquely identifying name for the Container.
|
||||
ContainerKeyName = "container.name"
|
||||
ContainerKeyImageName = "container.image.name"
|
||||
ContainerKeyImageTag = "container.image.tag"
|
||||
)
|
||||
|
||||
// Constants for Cloud resources.
|
||||
const (
|
||||
CloudKeyProvider = "cloud.provider"
|
||||
CloudKeyAccountID = "cloud.account.id"
|
||||
CloudKeyRegion = "cloud.region"
|
||||
CloudKeyZone = "cloud.zone"
|
||||
|
||||
// Cloud Providers
|
||||
CloudProviderAWS = "aws"
|
||||
CloudProviderGCP = "gcp"
|
||||
CloudProviderAZURE = "azure"
|
||||
)
|
||||
|
||||
// Constants for Host resources.
|
||||
const (
|
||||
// A uniquely identifying name for the host.
|
||||
HostKeyName = "host.name"
|
||||
|
||||
// A hostname as returned by the 'hostname' command on host machine.
|
||||
HostKeyHostName = "host.hostname"
|
||||
HostKeyID = "host.id"
|
||||
HostKeyType = "host.type"
|
||||
HostKeyImageName = "host.image.name"
|
||||
HostKeyImageID = "host.image.id"
|
||||
HostKeyImageVersion = "host.image.version"
|
||||
)
|
||||
Reference in New Issue
Block a user