1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-05 22:54:18 +02:00

Move Aggregator interface to aggregator package (#2444)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Bogdan Drutu 2021-12-15 08:13:48 -08:00 committed by GitHub
parent 4702f6fd6f
commit 4654d78104
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 125 additions and 120 deletions

View File

@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export"
@ -682,7 +683,7 @@ func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource.
desc := metrictest.NewDescriptor(r.name, r.iKind, r.nKind) desc := metrictest.NewDescriptor(r.name, r.iKind, r.nKind)
labs := attribute.NewSet(lcopy...) labs := attribute.NewSet(lcopy...)
var agg, ckpt export.Aggregator var agg, ckpt aggregator.Aggregator
if r.iKind.Adding() { if r.iKind.Adding() {
sums := sum.New(2) sums := sum.New(2)
agg, ckpt = &sums[0], &sums[1] agg, ckpt = &sums[0], &sums[1]

View File

@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/metric/metrictest" "go.opentelemetry.io/otel/metric/metrictest"
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export"
@ -241,10 +242,10 @@ func (t *testAgg) Aggregation() aggregation.Aggregation {
func (t *testAgg) Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error { func (t *testAgg) Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error {
return nil return nil
} }
func (t *testAgg) SynchronizedMove(destination export.Aggregator, descriptor *sdkapi.Descriptor) error { func (t *testAgg) SynchronizedMove(destination aggregator.Aggregator, descriptor *sdkapi.Descriptor) error {
return nil return nil
} }
func (t *testAgg) Merge(aggregator export.Aggregator, descriptor *sdkapi.Descriptor) error { func (t *testAgg) Merge(aggregator aggregator.Aggregator, descriptor *sdkapi.Descriptor) error {
return nil return nil
} }
@ -270,7 +271,7 @@ func (te *testErrSum) Kind() aggregation.Kind {
return aggregation.SumKind return aggregation.SumKind
} }
var _ export.Aggregator = &testAgg{} var _ aggregator.Aggregator = &testAgg{}
var _ aggregation.Aggregation = &testAgg{} var _ aggregation.Aggregation = &testAgg{}
var _ aggregation.Sum = &testErrSum{} var _ aggregation.Sum = &testErrSum{}
var _ aggregation.LastValue = &testErrLastValue{} var _ aggregation.LastValue = &testErrLastValue{}

View File

@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
) )
@ -26,8 +27,8 @@ import (
// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export" // Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export"
type Accumulation = export.Accumulation type Accumulation = export.Accumulation
// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export" // Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/aggregator"
type Aggregator = export.Aggregator type Aggregator = aggregator.Aggregator
// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export" // Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export"
type AggregatorSelector = export.AggregatorSelector type AggregatorSelector = export.AggregatorSelector

View File

@ -15,19 +15,81 @@
package aggregator // import "go.opentelemetry.io/otel/sdk/metric/aggregator" package aggregator // import "go.opentelemetry.io/otel/sdk/metric/aggregator"
import ( import (
"context"
"fmt" "fmt"
"math" "math"
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
) )
// Aggregator implements a specific aggregation behavior, e.g., a
// behavior to track a sequence of updates to an instrument. Counter
// instruments commonly use a simple Sum aggregator, but for the
// distribution instruments (Histogram, GaugeObserver) there are a
// number of possible aggregators with different cost and accuracy
// tradeoffs.
//
// Note that any Aggregator may be attached to any instrument--this is
// the result of the OpenTelemetry API/SDK separation. It is possible
// to attach a Sum aggregator to a Histogram instrument.
type Aggregator interface {
// Aggregation returns an Aggregation interface to access the
// current state of this Aggregator. The caller is
// responsible for synchronization and must not call any the
// other methods in this interface concurrently while using
// the Aggregation.
Aggregation() aggregation.Aggregation
// Update receives a new measured value and incorporates it
// into the aggregation. Update() calls may be called
// concurrently.
//
// Descriptor.NumberKind() should be consulted to determine
// whether the provided number is an int64 or float64.
//
// The Context argument comes from user-level code and could be
// inspected for a `correlation.Map` or `trace.SpanContext`.
Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error
// SynchronizedMove is called during collection to finish one
// period of aggregation by atomically saving the
// currently-updating state into the argument Aggregator AND
// resetting the current value to the zero state.
//
// SynchronizedMove() is called concurrently with Update(). These
// two methods must be synchronized with respect to each
// other, for correctness.
//
// After saving a synchronized copy, the Aggregator can be converted
// into one or more of the interfaces in the `aggregation` sub-package,
// according to kind of Aggregator that was selected.
//
// This method will return an InconsistentAggregatorError if
// this Aggregator cannot be copied into the destination due
// to an incompatible type.
//
// This call has no Context argument because it is expected to
// perform only computation.
//
// When called with a nil `destination`, this Aggregator is reset
// and the current value is discarded.
SynchronizedMove(destination Aggregator, descriptor *sdkapi.Descriptor) error
// Merge combines the checkpointed state from the argument
// Aggregator into this Aggregator. Merge is not synchronized
// with respect to Update or SynchronizedMove.
//
// The owner of an Aggregator being merged is responsible for
// synchronization of both Aggregator states.
Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error
}
// NewInconsistentAggregatorError formats an error describing an attempt to // NewInconsistentAggregatorError formats an error describing an attempt to
// Checkpoint or Merge different-type aggregators. The result can be unwrapped as // Checkpoint or Merge different-type aggregators. The result can be unwrapped as
// an ErrInconsistentType. // an ErrInconsistentType.
func NewInconsistentAggregatorError(a1, a2 export.Aggregator) error { func NewInconsistentAggregatorError(a1, a2 Aggregator) error {
return fmt.Errorf("%w: %T and %T", aggregation.ErrInconsistentType, a1, a2) return fmt.Errorf("%w: %T and %T", aggregation.ErrInconsistentType, a1, a2)
} }

View File

@ -30,7 +30,6 @@ import (
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
) )
@ -44,7 +43,7 @@ type Profile struct {
type NoopAggregator struct{} type NoopAggregator struct{}
type NoopAggregation struct{} type NoopAggregation struct{}
var _ export.Aggregator = NoopAggregator{} var _ aggregator.Aggregator = NoopAggregator{}
var _ aggregation.Aggregation = NoopAggregation{} var _ aggregation.Aggregation = NoopAggregation{}
func newProfiles() []Profile { func newProfiles() []Profile {
@ -150,7 +149,7 @@ func (n *Numbers) Points() []number.Number {
} }
// CheckedUpdate performs the same range test the SDK does on behalf of the aggregator. // CheckedUpdate performs the same range test the SDK does on behalf of the aggregator.
func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) { func CheckedUpdate(t *testing.T, agg aggregator.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) {
ctx := context.Background() ctx := context.Background()
// Note: Aggregator tests are written assuming that the SDK // Note: Aggregator tests are written assuming that the SDK
@ -166,7 +165,7 @@ func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, de
} }
} }
func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *sdkapi.Descriptor) { func CheckedMerge(t *testing.T, aggInto, aggFrom aggregator.Aggregator, descriptor *sdkapi.Descriptor) {
if err := aggInto.Merge(aggFrom, descriptor); err != nil { if err := aggInto.Merge(aggFrom, descriptor); err != nil {
t.Error("Unexpected Merge failure", err) t.Error("Unexpected Merge failure", err)
} }
@ -184,15 +183,15 @@ func (NoopAggregator) Update(context.Context, number.Number, *sdkapi.Descriptor)
return nil return nil
} }
func (NoopAggregator) SynchronizedMove(export.Aggregator, *sdkapi.Descriptor) error { func (NoopAggregator) SynchronizedMove(aggregator.Aggregator, *sdkapi.Descriptor) error {
return nil return nil
} }
func (NoopAggregator) Merge(export.Aggregator, *sdkapi.Descriptor) error { func (NoopAggregator) Merge(aggregator.Aggregator, *sdkapi.Descriptor) error {
return nil return nil
} }
func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) export.Aggregator) { func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) aggregator.Aggregator) {
t.Run("reset on nil", func(t *testing.T) { t.Run("reset on nil", func(t *testing.T) {
// Ensures that SynchronizedMove(nil, descriptor) discards and // Ensures that SynchronizedMove(nil, descriptor) discards and
// resets the aggregator. // resets the aggregator.

View File

@ -22,7 +22,6 @@ import (
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
) )
@ -97,7 +96,7 @@ var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) {
return return
}(defaultFloat64ExplicitBoundaries) }(defaultFloat64ExplicitBoundaries)
var _ export.Aggregator = &Aggregator{} var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.Sum = &Aggregator{} var _ aggregation.Sum = &Aggregator{}
var _ aggregation.Count = &Aggregator{} var _ aggregation.Count = &Aggregator{}
var _ aggregation.Histogram = &Aggregator{} var _ aggregation.Histogram = &Aggregator{}
@ -174,7 +173,7 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
// the empty set. Since no locks are taken, there is a chance that // the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each // the independent Sum, Count and Bucket Count are not consistent with each
// other. // other.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error { func (c *Aggregator) SynchronizedMove(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator) o, _ := oa.(*Aggregator)
if oa != nil && o == nil { if oa != nil && o == nil {
@ -254,7 +253,7 @@ func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap
} }
// Merge combines two histograms that have the same buckets into a single one. // Merge combines two histograms that have the same buckets into a single one.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error { func (c *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator) o, _ := oa.(*Aggregator)
if o == nil { if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa) return aggregator.NewInconsistentAggregatorError(c, oa)

View File

@ -25,9 +25,9 @@ import (
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/export"
) )
const count = 100 const count = 100
@ -240,7 +240,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest( aggregatortest.SynchronizedMoveResetTest(
t, t,
sdkapi.HistogramInstrumentKind, sdkapi.HistogramInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator { func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0] return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0]
}, },
) )

View File

@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
) )
@ -51,7 +50,7 @@ type (
} }
) )
var _ export.Aggregator = &Aggregator{} var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.LastValue = &Aggregator{} var _ aggregation.LastValue = &Aggregator{}
// An unset lastValue has zero timestamp and zero value. // An unset lastValue has zero timestamp and zero value.
@ -92,7 +91,7 @@ func (g *Aggregator) LastValue() (number.Number, time.Time, error) {
} }
// SynchronizedMove atomically saves the current value. // SynchronizedMove atomically saves the current value.
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error { func (g *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descriptor) error {
if oa == nil { if oa == nil {
atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue)) atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue))
return nil return nil
@ -117,7 +116,7 @@ func (g *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap
// Merge combines state from two aggregators. The most-recently set // Merge combines state from two aggregators. The most-recently set
// value is chosen. // value is chosen.
func (g *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error { func (g *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator) o, _ := oa.(*Aggregator)
if o == nil { if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa) return aggregator.NewInconsistentAggregatorError(g, oa)

View File

@ -27,14 +27,14 @@ import (
ottest "go.opentelemetry.io/otel/internal/internaltest" ottest "go.opentelemetry.io/otel/internal/internaltest"
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
) )
const count = 100 const count = 100
var _ export.Aggregator = &Aggregator{} var _ aggregator.Aggregator = &Aggregator{}
// Ensure struct alignment prior to running tests. // Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
@ -139,7 +139,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest( aggregatortest.SynchronizedMoveResetTest(
t, t,
sdkapi.GaugeObserverInstrumentKind, sdkapi.GaugeObserverInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator { func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &New(1)[0] return &New(1)[0]
}, },
) )

View File

@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
) )
@ -31,7 +30,7 @@ type Aggregator struct {
value number.Number value number.Number
} }
var _ export.Aggregator = &Aggregator{} var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.Sum = &Aggregator{} var _ aggregation.Sum = &Aggregator{}
// New returns a new counter aggregator implemented by atomic // New returns a new counter aggregator implemented by atomic
@ -59,7 +58,7 @@ func (c *Aggregator) Sum() (number.Number, error) {
// SynchronizedMove atomically saves the current value into oa and resets the // SynchronizedMove atomically saves the current value into oa and resets the
// current sum to zero. // current sum to zero.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error { func (c *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descriptor) error {
if oa == nil { if oa == nil {
c.value.SetRawAtomic(0) c.value.SetRawAtomic(0)
return nil return nil
@ -79,7 +78,7 @@ func (c *Aggregator) Update(_ context.Context, num number.Number, desc *sdkapi.D
} }
// Merge combines two counters by adding their sums. // Merge combines two counters by adding their sums.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error { func (c *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator) o, _ := oa.(*Aggregator)
if o == nil { if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa) return aggregator.NewInconsistentAggregatorError(c, oa)

View File

@ -24,8 +24,8 @@ import (
ottest "go.opentelemetry.io/otel/internal/internaltest" ottest "go.opentelemetry.io/otel/internal/internaltest"
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/export"
) )
const count = 100 const count = 100
@ -147,7 +147,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest( aggregatortest.SynchronizedMoveResetTest(
t, t,
sdkapi.CounterObserverInstrumentKind, sdkapi.CounterObserverInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator { func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &New(1)[0] return &New(1)[0]
}, },
) )

View File

@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
metricsdk "go.opentelemetry.io/otel/sdk/metric" metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
"go.opentelemetry.io/otel/sdk/metric/processor/processortest" "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
@ -72,7 +73,7 @@ type testSelector struct {
newAggCount int newAggCount int
} }
func (ts *testSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { func (ts *testSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) {
ts.newAggCount += len(aggPtrs) ts.newAggCount += len(aggPtrs)
processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...) processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...)
} }

View File

@ -20,9 +20,9 @@ import (
"time" "time"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
@ -94,7 +94,7 @@ type AggregatorSelector interface {
// Note: This is context-free because the aggregator should // Note: This is context-free because the aggregator should
// not relate to the incoming context. This call should not // not relate to the incoming context. This call should not
// block. // block.
AggregatorFor(descriptor *sdkapi.Descriptor, aggregator ...*Aggregator) AggregatorFor(descriptor *sdkapi.Descriptor, aggregator ...*aggregator.Aggregator)
} }
// Checkpointer is the interface used by a Controller to coordinate // Checkpointer is the interface used by a Controller to coordinate
@ -130,68 +130,6 @@ type CheckpointerFactory interface {
NewCheckpointer() Checkpointer NewCheckpointer() Checkpointer
} }
// Aggregator implements a specific aggregation behavior, e.g., a
// behavior to track a sequence of updates to an instrument. Counter
// instruments commonly use a simple Sum aggregator, but for the
// distribution instruments (Histogram, GaugeObserver) there are a
// number of possible aggregators with different cost and accuracy
// tradeoffs.
//
// Note that any Aggregator may be attached to any instrument--this is
// the result of the OpenTelemetry API/SDK separation. It is possible
// to attach a Sum aggregator to a Histogram instrument.
type Aggregator interface {
// Aggregation returns an Aggregation interface to access the
// current state of this Aggregator. The caller is
// responsible for synchronization and must not call any the
// other methods in this interface concurrently while using
// the Aggregation.
Aggregation() aggregation.Aggregation
// Update receives a new measured value and incorporates it
// into the aggregation. Update() calls may be called
// concurrently.
//
// Descriptor.NumberKind() should be consulted to determine
// whether the provided number is an int64 or float64.
//
// The Context argument comes from user-level code and could be
// inspected for a `correlation.Map` or `trace.SpanContext`.
Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error
// SynchronizedMove is called during collection to finish one
// period of aggregation by atomically saving the
// currently-updating state into the argument Aggregator AND
// resetting the current value to the zero state.
//
// SynchronizedMove() is called concurrently with Update(). These
// two methods must be synchronized with respect to each
// other, for correctness.
//
// After saving a synchronized copy, the Aggregator can be converted
// into one or more of the interfaces in the `aggregation` sub-package,
// according to kind of Aggregator that was selected.
//
// This method will return an InconsistentAggregatorError if
// this Aggregator cannot be copied into the destination due
// to an incompatible type.
//
// This call has no Context argument because it is expected to
// perform only computation.
//
// When called with a nil `destination`, this Aggregator is reset
// and the current value is discarded.
SynchronizedMove(destination Aggregator, descriptor *sdkapi.Descriptor) error
// Merge combines the checkpointed state from the argument
// Aggregator into this Aggregator. Merge is not synchronized
// with respect to Update or SynchronizedMove.
//
// The owner of an Aggregator being merged is responsible for
// synchronization of both Aggregator states.
Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error
}
// Exporter handles presentation of the checkpoint of aggregate // Exporter handles presentation of the checkpoint of aggregate
// metrics. This is the final stage of a metrics export pipeline, // metrics. This is the final stage of a metrics export pipeline,
// where metric data are formatted for a specific system. // where metric data are formatted for a specific system.
@ -267,7 +205,7 @@ type Metadata struct {
// and label set, as prepared by an Accumulator for the Processor. // and label set, as prepared by an Accumulator for the Processor.
type Accumulation struct { type Accumulation struct {
Metadata Metadata
aggregator Aggregator aggregator aggregator.Aggregator
} }
// Record contains the exported data for a single metric instrument // Record contains the exported data for a single metric instrument
@ -295,7 +233,7 @@ func (m Metadata) Labels() *attribute.Set {
// Accumulations to send to Processors. The Descriptor, Labels, // Accumulations to send to Processors. The Descriptor, Labels,
// and Aggregator represent aggregate metric events received over a single // and Aggregator represent aggregate metric events received over a single
// collection period. // collection period.
func NewAccumulation(descriptor *sdkapi.Descriptor, labels *attribute.Set, aggregator Aggregator) Accumulation { func NewAccumulation(descriptor *sdkapi.Descriptor, labels *attribute.Set, aggregator aggregator.Aggregator) Accumulation {
return Accumulation{ return Accumulation{
Metadata: Metadata{ Metadata: Metadata{
descriptor: descriptor, descriptor: descriptor,
@ -307,7 +245,7 @@ func NewAccumulation(descriptor *sdkapi.Descriptor, labels *attribute.Set, aggre
// Aggregator returns the checkpointed aggregator. It is safe to // Aggregator returns the checkpointed aggregator. It is safe to
// access the checkpointed state without locking. // access the checkpointed state without locking.
func (r Accumulation) Aggregator() Aggregator { func (r Accumulation) Aggregator() aggregator.Aggregator {
return r.aggregator return r.aggregator
} }

View File

@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
) )
@ -74,12 +75,12 @@ type (
// (if !currentOwned) or it refers to an Aggregator // (if !currentOwned) or it refers to an Aggregator
// owned by the processor used to accumulate multiple // owned by the processor used to accumulate multiple
// values in a single collection round. // values in a single collection round.
current export.Aggregator current aggregator.Aggregator
// cumulative, if non-nil, refers to an Aggregator owned // cumulative, if non-nil, refers to an Aggregator owned
// by the processor used to store the last cumulative // by the processor used to store the last cumulative
// value. // value.
cumulative export.Aggregator cumulative aggregator.Aggregator
} }
state struct { state struct {

View File

@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/instrumentation"
sdk "go.opentelemetry.io/otel/sdk/metric" sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/export/aggregation"
@ -110,7 +111,7 @@ func asNumber(nkind number.Kind, value int64) number.Number {
func updateFor(t *testing.T, desc *sdkapi.Descriptor, selector export.AggregatorSelector, value int64, labs ...attribute.KeyValue) export.Accumulation { func updateFor(t *testing.T, desc *sdkapi.Descriptor, selector export.AggregatorSelector, value int64, labs ...attribute.KeyValue) export.Accumulation {
ls := attribute.NewSet(labs...) ls := attribute.NewSet(labs...)
var agg export.Aggregator var agg aggregator.Aggregator
selector.AggregatorFor(desc, &agg) selector.AggregatorFor(desc, &agg)
require.NoError(t, agg.Update(context.Background(), asNumber(desc.NumberKind(), value), desc)) require.NoError(t, agg.Update(context.Background(), asNumber(desc.NumberKind(), value), desc))

View File

@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
@ -47,7 +48,7 @@ type (
mapValue struct { mapValue struct {
labels *attribute.Set labels *attribute.Set
resource *resource.Resource resource *resource.Resource
aggregator export.Aggregator aggregator aggregator.Aggregator
} }
// Output implements export.Reader. // Output implements export.Reader.
@ -178,7 +179,7 @@ func AggregatorSelector() export.AggregatorSelector {
} }
// AggregatorFor implements export.AggregatorSelector. // AggregatorFor implements export.AggregatorSelector.
func (testAggregatorSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { func (testAggregatorSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) {
switch { switch {
case strings.HasSuffix(desc.Name(), ".disabled"): case strings.HasSuffix(desc.Name(), ".disabled"):
@ -251,7 +252,7 @@ func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource
resource: res.Equivalent(), resource: res.Equivalent(),
} }
if _, ok := o.m[key]; !ok { if _, ok := o.m[key]; !ok {
var agg export.Aggregator var agg aggregator.Aggregator
testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg) testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg)
o.m[key] = mapValue{ o.m[key] = mapValue{
aggregator: agg, aggregator: agg,
@ -259,7 +260,7 @@ func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource
resource: res, resource: res,
} }
} }
return o.m[key].aggregator.Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor()) return o.m[key].aggregator.Merge(rec.Aggregation().(aggregator.Aggregator), rec.Descriptor())
} }
// Map returns the calculated values for test validation from a set of // Map returns the calculated values for test validation from a set of

View File

@ -114,8 +114,8 @@ type (
// current implements the actual RecordOne() API, // current implements the actual RecordOne() API,
// depending on the type of aggregation. If nil, the // depending on the type of aggregation. If nil, the
// metric was disabled by the exporter. // metric was disabled by the exporter.
current export.Aggregator current aggregator.Aggregator
checkpoint export.Aggregator checkpoint aggregator.Aggregator
} }
instrument struct { instrument struct {
@ -133,7 +133,7 @@ type (
labeledRecorder struct { labeledRecorder struct {
observedEpoch int64 observedEpoch int64
labels *attribute.Set labels *attribute.Set
observed export.Aggregator observed aggregator.Aggregator
} }
) )
@ -175,7 +175,7 @@ func (a *asyncInstrument) observe(num number.Number, labels *attribute.Set) {
} }
} }
func (a *asyncInstrument) getRecorder(labels *attribute.Set) export.Aggregator { func (a *asyncInstrument) getRecorder(labels *attribute.Set) aggregator.Aggregator {
lrec, ok := a.recorders[labels.Equivalent()] lrec, ok := a.recorders[labels.Equivalent()]
if ok { if ok {
// Note: SynchronizedMove(nil) can't return an error // Note: SynchronizedMove(nil) can't return an error
@ -184,7 +184,7 @@ func (a *asyncInstrument) getRecorder(labels *attribute.Set) export.Aggregator {
a.recorders[labels.Equivalent()] = lrec a.recorders[labels.Equivalent()] = lrec
return lrec.observed return lrec.observed
} }
var rec export.Aggregator var rec aggregator.Aggregator
a.meter.processor.AggregatorFor(&a.descriptor, &rec) a.meter.processor.AggregatorFor(&a.descriptor, &rec)
if a.recorders == nil { if a.recorders == nil {
a.recorders = make(map[attribute.Distinct]*labeledRecorder) a.recorders = make(map[attribute.Distinct]*labeledRecorder)

View File

@ -16,6 +16,7 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/selector/simple"
import ( import (
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
@ -50,21 +51,21 @@ func NewWithHistogramDistribution(options ...histogram.Option) export.Aggregator
return selectorHistogram{options: options} return selectorHistogram{options: options}
} }
func sumAggs(aggPtrs []*export.Aggregator) { func sumAggs(aggPtrs []*aggregator.Aggregator) {
aggs := sum.New(len(aggPtrs)) aggs := sum.New(len(aggPtrs))
for i := range aggPtrs { for i := range aggPtrs {
*aggPtrs[i] = &aggs[i] *aggPtrs[i] = &aggs[i]
} }
} }
func lastValueAggs(aggPtrs []*export.Aggregator) { func lastValueAggs(aggPtrs []*aggregator.Aggregator) {
aggs := lastvalue.New(len(aggPtrs)) aggs := lastvalue.New(len(aggPtrs))
for i := range aggPtrs { for i := range aggPtrs {
*aggPtrs[i] = &aggs[i] *aggPtrs[i] = &aggs[i]
} }
} }
func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) {
switch descriptor.InstrumentKind() { switch descriptor.InstrumentKind() {
case sdkapi.GaugeObserverInstrumentKind: case sdkapi.GaugeObserverInstrumentKind:
lastValueAggs(aggPtrs) lastValueAggs(aggPtrs)
@ -78,7 +79,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs
} }
} }
func (s selectorHistogram) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { func (s selectorHistogram) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) {
switch descriptor.InstrumentKind() { switch descriptor.InstrumentKind() {
case sdkapi.GaugeObserverInstrumentKind: case sdkapi.GaugeObserverInstrumentKind:
lastValueAggs(aggPtrs) lastValueAggs(aggPtrs)

View File

@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/otel/metric/metrictest" "go.opentelemetry.io/otel/metric/metrictest"
"go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
@ -38,8 +39,8 @@ var (
testGaugeObserverDesc = metrictest.NewDescriptor("gauge", sdkapi.GaugeObserverInstrumentKind, number.Int64Kind) testGaugeObserverDesc = metrictest.NewDescriptor("gauge", sdkapi.GaugeObserverInstrumentKind, number.Int64Kind)
) )
func oneAgg(sel export.AggregatorSelector, desc *sdkapi.Descriptor) export.Aggregator { func oneAgg(sel export.AggregatorSelector, desc *sdkapi.Descriptor) aggregator.Aggregator {
var agg export.Aggregator var agg aggregator.Aggregator
sel.AggregatorFor(desc, &agg) sel.AggregatorFor(desc, &agg)
return agg return agg
} }