mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-26 03:52:03 +02:00
d091ba88e4
* Return empty nil aggs if no meas * Update tests with new expected behavior * Add change to changelog * Set PR number in changelog * Run lint * Fix pipeline_test * Scope change in changelog to pkg * Clean up init of agg types
243 lines
6.9 KiB
Go
243 lines
6.9 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/sdk/metric/aggregation"
|
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
|
)
|
|
|
|
type buckets struct {
|
|
counts []uint64
|
|
count uint64
|
|
sum float64
|
|
min, max float64
|
|
}
|
|
|
|
// newBuckets returns buckets with n bins.
|
|
func newBuckets(n int) *buckets {
|
|
return &buckets{counts: make([]uint64, n)}
|
|
}
|
|
|
|
func (b *buckets) bin(idx int, value float64) {
|
|
b.counts[idx]++
|
|
b.count++
|
|
b.sum += value
|
|
if value < b.min {
|
|
b.min = value
|
|
} else if value > b.max {
|
|
b.max = value
|
|
}
|
|
}
|
|
|
|
// histValues summarizes a set of measurements as an histValues with
|
|
// explicitly defined buckets.
|
|
type histValues[N int64 | float64] struct {
|
|
bounds []float64
|
|
|
|
values map[attribute.Set]*buckets
|
|
valuesMu sync.Mutex
|
|
}
|
|
|
|
func newHistValues[N int64 | float64](bounds []float64) *histValues[N] {
|
|
// The responsibility of keeping all buckets correctly associated with the
|
|
// passed boundaries is ultimately this type's responsibility. Make a copy
|
|
// here so we can always guarantee this. Or, in the case of failure, have
|
|
// complete control over the fix.
|
|
b := make([]float64, len(bounds))
|
|
copy(b, bounds)
|
|
sort.Float64s(b)
|
|
return &histValues[N]{
|
|
bounds: b,
|
|
values: make(map[attribute.Set]*buckets),
|
|
}
|
|
}
|
|
|
|
// Aggregate records the measurement value, scoped by attr, and aggregates it
|
|
// into a histogram.
|
|
func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
|
|
// Accept all types to satisfy the Aggregator interface. However, since
|
|
// the Aggregation produced by this Aggregator is only float64, convert
|
|
// here to only use this type.
|
|
v := float64(value)
|
|
|
|
// This search will return an index in the range [0, len(s.bounds)], where
|
|
// it will return len(s.bounds) if value is greater than the last element
|
|
// of s.bounds. This aligns with the buckets in that the length of buckets
|
|
// is len(s.bounds)+1, with the last bucket representing:
|
|
// (s.bounds[len(s.bounds)-1], +∞).
|
|
idx := sort.SearchFloat64s(s.bounds, v)
|
|
|
|
s.valuesMu.Lock()
|
|
defer s.valuesMu.Unlock()
|
|
|
|
b, ok := s.values[attr]
|
|
if !ok {
|
|
// N+1 buckets. For example:
|
|
//
|
|
// bounds = [0, 5, 10]
|
|
//
|
|
// Then,
|
|
//
|
|
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
|
|
b = newBuckets(len(s.bounds) + 1)
|
|
// Ensure min and max are recorded values (not zero), for new buckets.
|
|
b.min, b.max = v, v
|
|
s.values[attr] = b
|
|
}
|
|
b.bin(idx, v)
|
|
}
|
|
|
|
// NewDeltaHistogram returns an Aggregator that summarizes a set of
|
|
// measurements as an histogram. Each histogram is scoped by attributes and
|
|
// the aggregation cycle the measurements were made in.
|
|
//
|
|
// Each aggregation cycle is treated independently. When the returned
|
|
// Aggregator's Aggregations method is called it will reset all histogram
|
|
// counts to zero.
|
|
func NewDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] {
|
|
return &deltaHistogram[N]{
|
|
histValues: newHistValues[N](cfg.Boundaries),
|
|
noMinMax: cfg.NoMinMax,
|
|
start: now(),
|
|
}
|
|
}
|
|
|
|
// deltaHistogram summarizes a set of measurements made in a single
|
|
// aggregation cycle as an histogram with explicitly defined buckets.
|
|
type deltaHistogram[N int64 | float64] struct {
|
|
*histValues[N]
|
|
|
|
noMinMax bool
|
|
start time.Time
|
|
}
|
|
|
|
func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
|
|
s.valuesMu.Lock()
|
|
defer s.valuesMu.Unlock()
|
|
|
|
if len(s.values) == 0 {
|
|
return nil
|
|
}
|
|
|
|
t := now()
|
|
// Do not allow modification of our copy of bounds.
|
|
bounds := make([]float64, len(s.bounds))
|
|
copy(bounds, s.bounds)
|
|
h := metricdata.Histogram{
|
|
Temporality: metricdata.DeltaTemporality,
|
|
DataPoints: make([]metricdata.HistogramDataPoint, 0, len(s.values)),
|
|
}
|
|
for a, b := range s.values {
|
|
hdp := metricdata.HistogramDataPoint{
|
|
Attributes: a,
|
|
StartTime: s.start,
|
|
Time: t,
|
|
Count: b.count,
|
|
Bounds: bounds,
|
|
BucketCounts: b.counts,
|
|
Sum: b.sum,
|
|
}
|
|
if !s.noMinMax {
|
|
hdp.Min = &b.min
|
|
hdp.Max = &b.max
|
|
}
|
|
h.DataPoints = append(h.DataPoints, hdp)
|
|
|
|
// Unused attribute sets do not report.
|
|
delete(s.values, a)
|
|
}
|
|
// The delta collection cycle resets.
|
|
s.start = t
|
|
return h
|
|
}
|
|
|
|
// NewCumulativeHistogram returns an Aggregator that summarizes a set of
|
|
// measurements as an histogram. Each histogram is scoped by attributes.
|
|
//
|
|
// Each aggregation cycle builds from the previous, the histogram counts are
|
|
// the bucketed counts of all values aggregated since the returned Aggregator
|
|
// was created.
|
|
func NewCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] {
|
|
return &cumulativeHistogram[N]{
|
|
histValues: newHistValues[N](cfg.Boundaries),
|
|
noMinMax: cfg.NoMinMax,
|
|
start: now(),
|
|
}
|
|
}
|
|
|
|
// cumulativeHistogram summarizes a set of measurements made over all
|
|
// aggregation cycles as an histogram with explicitly defined buckets.
|
|
type cumulativeHistogram[N int64 | float64] struct {
|
|
*histValues[N]
|
|
|
|
noMinMax bool
|
|
start time.Time
|
|
}
|
|
|
|
func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation {
|
|
s.valuesMu.Lock()
|
|
defer s.valuesMu.Unlock()
|
|
|
|
if len(s.values) == 0 {
|
|
return nil
|
|
}
|
|
|
|
t := now()
|
|
// Do not allow modification of our copy of bounds.
|
|
bounds := make([]float64, len(s.bounds))
|
|
copy(bounds, s.bounds)
|
|
h := metricdata.Histogram{
|
|
Temporality: metricdata.CumulativeTemporality,
|
|
DataPoints: make([]metricdata.HistogramDataPoint, 0, len(s.values)),
|
|
}
|
|
for a, b := range s.values {
|
|
// The HistogramDataPoint field values returned need to be copies of
|
|
// the buckets value as we will keep updating them.
|
|
//
|
|
// TODO (#3047): Making copies for bounds and counts incurs a large
|
|
// memory allocation footprint. Alternatives should be explored.
|
|
counts := make([]uint64, len(b.counts))
|
|
copy(counts, b.counts)
|
|
|
|
hdp := metricdata.HistogramDataPoint{
|
|
Attributes: a,
|
|
StartTime: s.start,
|
|
Time: t,
|
|
Count: b.count,
|
|
Bounds: bounds,
|
|
BucketCounts: counts,
|
|
Sum: b.sum,
|
|
}
|
|
if !s.noMinMax {
|
|
// Similar to counts, make a copy.
|
|
min, max := b.min, b.max
|
|
hdp.Min = &min
|
|
hdp.Max = &max
|
|
}
|
|
h.DataPoints = append(h.DataPoints, hdp)
|
|
// TODO (#3006): This will use an unbounded amount of memory if there
|
|
// are unbounded number of attribute sets being aggregated. Attribute
|
|
// sets that become "stale" need to be forgotten so this will not
|
|
// overload the system.
|
|
}
|
|
return h
|
|
}
|