You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-29 23:07:45 +02:00
Flatten sdk/metric/aggregation into sdk/metric (#4435)
* Deprecate the aggregation pkg * Decouple the internal/aggregate from aggregation pkg * Add Aggregation to the metric pkg * Do not use sdk/metric/aggregation in stdoutmetric exporter * Update all generated templates * Update prom exporter * Fix view example * Add changes to changelog * Update CHANGELOG.md Co-authored-by: Robert Pająk <pellared@hotmail.com> * Rename Sum to AggregationSum * Fix comments * Centralize validation of aggregation in pipeline * Remove validation of agg in manual_reader selector opt * Fix merge --------- Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
@@ -27,7 +27,6 @@ import (
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/embedded"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
@@ -308,14 +307,28 @@ type aggVal[N int64 | float64] struct {
|
||||
// is returned.
|
||||
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
|
||||
switch stream.Aggregation.(type) {
|
||||
case nil, aggregation.Default:
|
||||
case nil:
|
||||
// Undefined, nil, means to use the default from the reader.
|
||||
stream.Aggregation = i.pipeline.reader.aggregation(kind)
|
||||
switch stream.Aggregation.(type) {
|
||||
case nil, aggregation.Default:
|
||||
case nil, AggregationDefault:
|
||||
// If the reader returns default or nil use the default selector.
|
||||
stream.Aggregation = DefaultAggregationSelector(kind)
|
||||
default:
|
||||
// Deep copy and validate before using.
|
||||
stream.Aggregation = stream.Aggregation.copy()
|
||||
if err := stream.Aggregation.err(); err != nil {
|
||||
orig := stream.Aggregation
|
||||
stream.Aggregation = DefaultAggregationSelector(kind)
|
||||
global.Error(
|
||||
err, "using default aggregation instead",
|
||||
"aggregation", orig,
|
||||
"replacement", stream.Aggregation,
|
||||
)
|
||||
}
|
||||
}
|
||||
case AggregationDefault:
|
||||
stream.Aggregation = DefaultAggregationSelector(kind)
|
||||
}
|
||||
|
||||
if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
|
||||
@@ -423,15 +436,15 @@ func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
|
||||
// aggregateFunc returns new aggregate functions matching agg, kind, and
|
||||
// monotonic. If the agg is unknown or temporality is invalid, an error is
|
||||
// returned.
|
||||
func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggregation, kind InstrumentKind) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) {
|
||||
func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg Aggregation, kind InstrumentKind) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) {
|
||||
switch a := agg.(type) {
|
||||
case aggregation.Default:
|
||||
case AggregationDefault:
|
||||
return i.aggregateFunc(b, DefaultAggregationSelector(kind), kind)
|
||||
case aggregation.Drop:
|
||||
case AggregationDrop:
|
||||
// Return nil in and out to signify the drop aggregator.
|
||||
case aggregation.LastValue:
|
||||
case AggregationLastValue:
|
||||
meas, comp = b.LastValue()
|
||||
case aggregation.Sum:
|
||||
case AggregationSum:
|
||||
switch kind {
|
||||
case InstrumentKindObservableCounter:
|
||||
meas, comp = b.PrecomputedSum(true)
|
||||
@@ -444,7 +457,7 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr
|
||||
// instrumentKindUndefined or other invalid instrument kinds.
|
||||
meas, comp = b.Sum(false)
|
||||
}
|
||||
case aggregation.ExplicitBucketHistogram:
|
||||
case AggregationExplicitBucketHistogram:
|
||||
var noSum bool
|
||||
switch kind {
|
||||
case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge:
|
||||
@@ -453,8 +466,8 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
|
||||
noSum = true
|
||||
}
|
||||
meas, comp = b.ExplicitBucketHistogram(a, noSum)
|
||||
case aggregation.Base2ExponentialHistogram:
|
||||
meas, comp = b.ExplicitBucketHistogram(a.Boundaries, a.NoMinMax, noSum)
|
||||
case AggregationBase2ExponentialHistogram:
|
||||
var noSum bool
|
||||
switch kind {
|
||||
case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge:
|
||||
@@ -463,7 +476,7 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
|
||||
noSum = true
|
||||
}
|
||||
meas, comp = b.ExponentialBucketHistogram(a, noSum)
|
||||
meas, comp = b.ExponentialBucketHistogram(a.MaxSize, a.MaxScale, a.NoMinMax, noSum)
|
||||
|
||||
default:
|
||||
err = errUnknownAggregation
|
||||
@@ -483,11 +496,11 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr
|
||||
// | Observable Counter | ✓ | | ✓ | ✓ | ✓ |
|
||||
// | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
|
||||
// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |.
|
||||
func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error {
|
||||
func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
|
||||
switch agg.(type) {
|
||||
case aggregation.Default:
|
||||
case AggregationDefault:
|
||||
return nil
|
||||
case aggregation.ExplicitBucketHistogram, aggregation.Base2ExponentialHistogram:
|
||||
case AggregationExplicitBucketHistogram, AggregationBase2ExponentialHistogram:
|
||||
switch kind {
|
||||
case InstrumentKindCounter,
|
||||
InstrumentKindUpDownCounter,
|
||||
@@ -499,7 +512,7 @@ func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) er
|
||||
default:
|
||||
return errIncompatibleAggregation
|
||||
}
|
||||
case aggregation.Sum:
|
||||
case AggregationSum:
|
||||
switch kind {
|
||||
case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter:
|
||||
return nil
|
||||
@@ -508,14 +521,14 @@ func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) er
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
||||
return errIncompatibleAggregation
|
||||
}
|
||||
case aggregation.LastValue:
|
||||
case AggregationLastValue:
|
||||
if kind == InstrumentKindObservableGauge {
|
||||
return nil
|
||||
}
|
||||
// TODO: review need for aggregation check after
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
||||
return errIncompatibleAggregation
|
||||
case aggregation.Drop:
|
||||
case AggregationDrop:
|
||||
return nil
|
||||
default:
|
||||
// This is used passed checking for default, it should be an error at this point.
|
||||
|
||||
Reference in New Issue
Block a user