1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-29 23:07:45 +02:00

Move aggs to internal/aggregate (#4283)

This commit is contained in:
Tyler Yahn
2023-07-03 01:53:00 -07:00
committed by GitHub
parent 97273da7c9
commit 10c3445543
17 changed files with 75 additions and 74 deletions

View File

@@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
@@ -233,16 +234,16 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *i
//
// If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], error) {
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Aggregator[N], error) {
var (
matched bool
aggs []internal.Aggregator[N]
aggs []aggregate.Aggregator[N]
)
errs := &multierror{wrapped: errCreatingAggregators}
// The cache will return the same Aggregator instance. Use this fact to
// compare pointer addresses to deduplicate Aggregators.
seen := make(map[internal.Aggregator[N]]struct{})
seen := make(map[aggregate.Aggregator[N]]struct{})
for _, v := range i.pipeline.views {
stream, match := v(inst)
if !match {
@@ -288,7 +289,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err
// aggVal is the cached value in an aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Aggregator aggregate.Aggregator[N]
Err error
}
@@ -305,7 +306,7 @@ type aggVal[N int64 | float64] struct {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (internal.Aggregator[N], error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (aggregate.Aggregator[N], error) {
switch stream.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
@@ -332,7 +333,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
return aggVal[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
agg = aggregate.NewFilter(agg, stream.AttributeFilter)
}
i.pipeline.addSync(scope, instrumentSync{
@@ -388,14 +389,14 @@ func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID {
// aggregator returns a new Aggregator matching agg, kind, temporality, and
// monotonic. If the agg is unknown or temporality is invalid, an error is
// returned.
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKind, temporality metricdata.Temporality, monotonic bool) (aggregate.Aggregator[N], error) {
switch a := agg.(type) {
case aggregation.Default:
return i.aggregator(DefaultAggregationSelector(kind), kind, temporality, monotonic)
case aggregation.Drop:
return nil, nil
case aggregation.LastValue:
return internal.NewLastValue[N](), nil
return aggregate.NewLastValue[N](), nil
case aggregation.Sum:
switch kind {
case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter:
@@ -404,9 +405,9 @@ func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKin
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/metrics/api.md#asynchronous-counter-creation
switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewPrecomputedCumulativeSum[N](monotonic), nil
return aggregate.NewPrecomputedCumulativeSum[N](monotonic), nil
case metricdata.DeltaTemporality:
return internal.NewPrecomputedDeltaSum[N](monotonic), nil
return aggregate.NewPrecomputedDeltaSum[N](monotonic), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
@@ -414,18 +415,18 @@ func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKin
switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewCumulativeSum[N](monotonic), nil
return aggregate.NewCumulativeSum[N](monotonic), nil
case metricdata.DeltaTemporality:
return internal.NewDeltaSum[N](monotonic), nil
return aggregate.NewDeltaSum[N](monotonic), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
case aggregation.ExplicitBucketHistogram:
switch temporality {
case metricdata.CumulativeTemporality:
return internal.NewCumulativeHistogram[N](a), nil
return aggregate.NewCumulativeHistogram[N](a), nil
case metricdata.DeltaTemporality:
return internal.NewDeltaHistogram[N](a), nil
return aggregate.NewDeltaHistogram[N](a), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
@@ -536,8 +537,8 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) re
// Aggregators returns the Aggregators that must be updated by the instrument
// defined by key.
func (r resolver[N]) Aggregators(id Instrument) ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Aggregator[N], error) {
var aggs []aggregate.Aggregator[N]
errs := &multierror{}
for _, i := range r.inserters {