mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-04 09:43:23 +02:00
Rename AggregationSelector to AggregatorSelector (#859)
This commit is contained in:
parent
ea53fb4d1a
commit
0382850707
@ -34,7 +34,7 @@ var Must = metric.Must
|
||||
// benchFixture is copied from sdk/metric/benchmark_test.go.
|
||||
// TODO refactor to share this code.
|
||||
type benchFixture struct {
|
||||
export.AggregationSelector
|
||||
export.AggregatorSelector
|
||||
accumulator *sdk.Accumulator
|
||||
meter metric.Meter
|
||||
B *testing.B
|
||||
@ -45,8 +45,8 @@ var _ metric.Provider = &benchFixture{}
|
||||
func newFixture(b *testing.B) *benchFixture {
|
||||
b.ReportAllocs()
|
||||
bf := &benchFixture{
|
||||
B: b,
|
||||
AggregationSelector: test.AggregationSelector(),
|
||||
B: b,
|
||||
AggregatorSelector: test.AggregatorSelector(),
|
||||
}
|
||||
|
||||
bf.accumulator = sdk.NewAccumulator(bf)
|
||||
|
@ -28,7 +28,7 @@ import (
|
||||
)
|
||||
|
||||
// Integrator is responsible for deciding which kind of aggregation to
|
||||
// use (via AggregationSelector), gathering exported results from the
|
||||
// use (via AggregatorSelector), gathering exported results from the
|
||||
// SDK during collection, and deciding over which dimensions to group
|
||||
// the exported data.
|
||||
//
|
||||
@ -36,7 +36,7 @@ import (
|
||||
// the sole responsibility of determining which Aggregator to use for
|
||||
// each record.
|
||||
//
|
||||
// The embedded AggregationSelector interface is called (concurrently)
|
||||
// The embedded AggregatorSelector interface is called (concurrently)
|
||||
// in instrumentation context to select the appropriate Aggregator for
|
||||
// an instrument.
|
||||
//
|
||||
@ -45,7 +45,7 @@ import (
|
||||
// checkpointed, allowing the integrator to build the set of metrics
|
||||
// currently being exported.
|
||||
type Integrator interface {
|
||||
// AggregationSelector is responsible for selecting the
|
||||
// AggregatorSelector is responsible for selecting the
|
||||
// concrete type of Aggregator used for a metric in the SDK.
|
||||
//
|
||||
// This may be a static decision based on fields of the
|
||||
@ -62,7 +62,7 @@ type Integrator interface {
|
||||
// Note that the SDK only calls AggregatorFor when new records
|
||||
// require an Aggregator. This does not provide a way to
|
||||
// disable metrics with active records.
|
||||
AggregationSelector
|
||||
AggregatorSelector
|
||||
|
||||
// Process is called by the SDK once per internal record,
|
||||
// passing the export Accumulation (a Descriptor, the corresponding
|
||||
@ -74,9 +74,9 @@ type Integrator interface {
|
||||
Process(Accumulation) error
|
||||
}
|
||||
|
||||
// AggregationSelector supports selecting the kind of Aggregator to
|
||||
// AggregatorSelector supports selecting the kind of Aggregator to
|
||||
// use at runtime for a specific metric instrument.
|
||||
type AggregationSelector interface {
|
||||
type AggregatorSelector interface {
|
||||
// AggregatorFor allocates a variable number of aggregators of
|
||||
// a kind suitable for the requested export. This method
|
||||
// initializes a `...*Aggregator`, to support making a single
|
||||
|
@ -32,14 +32,14 @@ type benchFixture struct {
|
||||
meter metric.MeterMust
|
||||
accumulator *sdk.Accumulator
|
||||
B *testing.B
|
||||
export.AggregationSelector
|
||||
export.AggregatorSelector
|
||||
}
|
||||
|
||||
func newFixture(b *testing.B) *benchFixture {
|
||||
b.ReportAllocs()
|
||||
bf := &benchFixture{
|
||||
B: b,
|
||||
AggregationSelector: test.AggregationSelector(),
|
||||
B: b,
|
||||
AggregatorSelector: test.AggregatorSelector(),
|
||||
}
|
||||
|
||||
bf.accumulator = sdk.NewAccumulator(bf)
|
||||
|
@ -45,7 +45,7 @@ type Controller struct {
|
||||
}
|
||||
|
||||
// New returns a *Controller configured with an aggregation selector and options.
|
||||
func New(aselector export.AggregationSelector, eselector export.ExportKindSelector, options ...Option) *Controller {
|
||||
func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector, options ...Option) *Controller {
|
||||
config := &Config{
|
||||
Resource: resource.Empty(),
|
||||
CachePeriod: DefaultCachePeriod,
|
||||
|
@ -49,7 +49,7 @@ type Controller struct {
|
||||
// New constructs a Controller, an implementation of metric.Provider,
|
||||
// using the provided exporter and options to configure an SDK with
|
||||
// periodic collection.
|
||||
func New(selector export.AggregationSelector, exporter export.Exporter, opts ...Option) *Controller {
|
||||
func New(selector export.AggregatorSelector, exporter export.Exporter, opts ...Option) *Controller {
|
||||
c := &Config{
|
||||
Period: DefaultPushPeriod,
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ func (e *testExporter) resetRecords() ([]export.Record, int) {
|
||||
|
||||
func TestPushDoubleStop(t *testing.T) {
|
||||
fix := newFixture(t)
|
||||
p := push.New(integratorTest.AggregationSelector(), fix.exporter)
|
||||
p := push.New(integratorTest.AggregatorSelector(), fix.exporter)
|
||||
p.Start()
|
||||
p.Stop()
|
||||
p.Stop()
|
||||
@ -133,7 +133,7 @@ func TestPushDoubleStop(t *testing.T) {
|
||||
|
||||
func TestPushDoubleStart(t *testing.T) {
|
||||
fix := newFixture(t)
|
||||
p := push.New(test.AggregationSelector(), fix.exporter)
|
||||
p := push.New(test.AggregatorSelector(), fix.exporter)
|
||||
p.Start()
|
||||
p.Start()
|
||||
p.Stop()
|
||||
@ -143,7 +143,7 @@ func TestPushTicker(t *testing.T) {
|
||||
fix := newFixture(t)
|
||||
|
||||
p := push.New(
|
||||
test.AggregationSelector(),
|
||||
test.AggregatorSelector(),
|
||||
fix.exporter,
|
||||
push.WithPeriod(time.Second),
|
||||
push.WithResource(testResource),
|
||||
@ -224,7 +224,7 @@ func TestPushExportError(t *testing.T) {
|
||||
fix.exporter.injectErr = injector("counter1.sum", tt.injectedError)
|
||||
|
||||
p := push.New(
|
||||
test.AggregationSelector(),
|
||||
test.AggregatorSelector(),
|
||||
fix.exporter,
|
||||
push.WithPeriod(time.Second),
|
||||
push.WithResource(testResource),
|
||||
|
@ -78,20 +78,20 @@ type correctnessIntegrator struct {
|
||||
}
|
||||
|
||||
type testSelector struct {
|
||||
selector export.AggregationSelector
|
||||
selector export.AggregatorSelector
|
||||
newAggCount int
|
||||
}
|
||||
|
||||
func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
|
||||
ts.newAggCount += len(aggPtrs)
|
||||
test.AggregationSelector().AggregatorFor(desc, aggPtrs...)
|
||||
test.AggregatorSelector().AggregatorFor(desc, aggPtrs...)
|
||||
}
|
||||
|
||||
func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) {
|
||||
testHandler.Reset()
|
||||
integrator := &correctnessIntegrator{
|
||||
t: t,
|
||||
testSelector: &testSelector{selector: test.AggregationSelector()},
|
||||
testSelector: &testSelector{selector: test.AggregatorSelector()},
|
||||
}
|
||||
accum := metricsdk.NewAccumulator(
|
||||
integrator,
|
||||
|
@ -74,13 +74,14 @@ export pipeline, containing the name, units, description, metric kind,
|
||||
number kind (int64 or float64). A Descriptor accompanies metric data
|
||||
as it passes through the export pipeline.
|
||||
|
||||
The AggregationSelector interface supports choosing the method of
|
||||
aggregation to apply to a particular instrument. Given the Descriptor,
|
||||
this AggregatorFor method returns an implementation of Aggregator. If this
|
||||
The AggregatorSelector interface supports choosing the method of
|
||||
aggregation to apply to a particular instrument, by delegating the
|
||||
construction of an Aggregator to this interface. Given the Descriptor,
|
||||
the AggregatorFor method returns an implementation of Aggregator. If this
|
||||
interface returns nil, the metric will be disabled. The aggregator should
|
||||
be matched to the capabilities of the exporter. Selecting the aggregator
|
||||
for sum-only instruments is relatively straightforward, but many options
|
||||
are available for aggregating distributions from ValueRecorder instruments.
|
||||
for Adding instruments is relatively straightforward, but many options
|
||||
are available for aggregating distributions from Grouping instruments.
|
||||
|
||||
Aggregator is an interface which implements a concrete strategy for
|
||||
aggregating metric updates. Several Aggregator implementations are
|
||||
@ -94,7 +95,7 @@ context, that combines state from two aggregators into one. Each SDK
|
||||
record has an associated aggregator.
|
||||
|
||||
Integrator is an interface which sits between the SDK and an exporter.
|
||||
The Integrator embeds an AggregationSelector, used by the SDK to assign
|
||||
The Integrator embeds an AggregatorSelector, used by the SDK to assign
|
||||
new Aggregators. The Integrator supports a Process() API for submitting
|
||||
checkpointed aggregators to the integrator, and a CheckpointSet() API
|
||||
for producing a complete checkpoint for the exporter. Two default
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
type (
|
||||
Integrator struct {
|
||||
export.ExportKindSelector
|
||||
export.AggregationSelector
|
||||
export.AggregatorSelector
|
||||
|
||||
state
|
||||
}
|
||||
@ -89,15 +89,15 @@ var ErrInconsistentState = fmt.Errorf("inconsistent integrator state")
|
||||
var ErrInvalidExporterKind = fmt.Errorf("invalid exporter kind")
|
||||
|
||||
// New returns a basic Integrator using the provided
|
||||
// AggregationSelector to select Aggregators. The ExportKindSelector
|
||||
// AggregatorSelector to select Aggregators. The ExportKindSelector
|
||||
// is consulted to determine the kind(s) of exporter that will consume
|
||||
// data, so that this Integrator can prepare to compute Delta or
|
||||
// Cumulative Aggregations as needed.
|
||||
func New(aselector export.AggregationSelector, eselector export.ExportKindSelector) *Integrator {
|
||||
func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector) *Integrator {
|
||||
now := time.Now()
|
||||
return &Integrator{
|
||||
AggregationSelector: aselector,
|
||||
ExportKindSelector: eselector,
|
||||
AggregatorSelector: aselector,
|
||||
ExportKindSelector: eselector,
|
||||
state: state{
|
||||
values: map[stateKey]*stateValue{},
|
||||
processStart: now,
|
||||
@ -206,7 +206,7 @@ func (b *Integrator) Process(accum export.Accumulation) error {
|
||||
// and it would be allocated in this block when multiple
|
||||
// accumulators are used and the first condition is not
|
||||
// met.
|
||||
b.AggregationSelector.AggregatorFor(desc, &value.delta)
|
||||
b.AggregatorSelector.AggregatorFor(desc, &value.delta)
|
||||
}
|
||||
if value.current != value.delta {
|
||||
// If the current and delta Aggregators are not the same it
|
||||
|
@ -266,19 +266,19 @@ func (bogusExporter) Export(context.Context, export.CheckpointSet) error {
|
||||
|
||||
func TestBasicInconsistent(t *testing.T) {
|
||||
// Test double-start
|
||||
b := basic.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
b := basic.New(test.AggregatorSelector(), export.PassThroughExporter)
|
||||
|
||||
b.StartCollection()
|
||||
b.StartCollection()
|
||||
require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
|
||||
|
||||
// Test finish without start
|
||||
b = basic.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
b = basic.New(test.AggregatorSelector(), export.PassThroughExporter)
|
||||
|
||||
require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
|
||||
|
||||
// Test no finish
|
||||
b = basic.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
b = basic.New(test.AggregatorSelector(), export.PassThroughExporter)
|
||||
|
||||
b.StartCollection()
|
||||
require.Equal(
|
||||
@ -291,14 +291,14 @@ func TestBasicInconsistent(t *testing.T) {
|
||||
)
|
||||
|
||||
// Test no start
|
||||
b = basic.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
b = basic.New(test.AggregatorSelector(), export.PassThroughExporter)
|
||||
|
||||
desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
|
||||
accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), exportTest.NoopAggregator{})
|
||||
require.Equal(t, basic.ErrInconsistentState, b.Process(accum))
|
||||
|
||||
// Test invalid kind:
|
||||
b = basic.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
b = basic.New(test.AggregatorSelector(), export.PassThroughExporter)
|
||||
b.StartCollection()
|
||||
require.NoError(t, b.Process(accum))
|
||||
require.NoError(t, b.FinishCollection())
|
||||
@ -313,7 +313,7 @@ func TestBasicInconsistent(t *testing.T) {
|
||||
|
||||
func TestBasicTimestamps(t *testing.T) {
|
||||
beforeNew := time.Now()
|
||||
b := basic.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
b := basic.New(test.AggregatorSelector(), export.PassThroughExporter)
|
||||
afterNew := time.Now()
|
||||
|
||||
desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
|
||||
|
@ -38,10 +38,10 @@ type (
|
||||
labelEncoder label.Encoder
|
||||
}
|
||||
|
||||
// testAggregationSelector returns aggregators consistent with
|
||||
// testAggregatorSelector returns aggregators consistent with
|
||||
// the test variables below, needed for testing stateful
|
||||
// integrators, which clone Aggregators using AggregatorFor(desc).
|
||||
testAggregationSelector struct{}
|
||||
testAggregatorSelector struct{}
|
||||
)
|
||||
|
||||
func NewOutput(labelEncoder label.Encoder) Output {
|
||||
@ -51,14 +51,14 @@ func NewOutput(labelEncoder label.Encoder) Output {
|
||||
}
|
||||
}
|
||||
|
||||
// AggregationSelector returns a policy that is consistent with the
|
||||
// AggregatorSelector returns a policy that is consistent with the
|
||||
// test descriptors above. I.e., it returns sum.New() for counter
|
||||
// instruments and lastvalue.New() for lastValue instruments.
|
||||
func AggregationSelector() export.AggregationSelector {
|
||||
return testAggregationSelector{}
|
||||
func AggregatorSelector() export.AggregatorSelector {
|
||||
return testAggregatorSelector{}
|
||||
}
|
||||
|
||||
func (testAggregationSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
|
||||
func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
|
||||
|
||||
switch {
|
||||
case strings.HasSuffix(desc.Name(), ".disabled"):
|
||||
@ -96,7 +96,7 @@ func (testAggregationSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ..
|
||||
*aggPtrs[i] = &aggs[i]
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprint("Invalid instrument name for test AggregationSelector: ", desc.Name()))
|
||||
panic(fmt.Sprint("Invalid instrument name for test AggregatorSelector: ", desc.Name()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -171,7 +171,7 @@ func (a *asyncInstrument) observe(number api.Number, labels *label.Set) {
|
||||
recorder := a.getRecorder(labels)
|
||||
if recorder == nil {
|
||||
// The instrument is disabled according to the
|
||||
// AggregationSelector.
|
||||
// AggregatorSelector.
|
||||
return
|
||||
}
|
||||
if err := recorder.Update(context.Background(), number, &a.descriptor); err != nil {
|
||||
@ -509,7 +509,7 @@ func (m *Accumulator) RecordBatch(ctx context.Context, kvs []kv.KeyValue, measur
|
||||
// RecordOne implements api.SyncImpl.
|
||||
func (r *record) RecordOne(ctx context.Context, number api.Number) {
|
||||
if r.current == nil {
|
||||
// The instrument is disabled according to the AggregationSelector.
|
||||
// The instrument is disabled according to the AggregatorSelector.
|
||||
return
|
||||
}
|
||||
if err := aggregator.RangeTest(number, &r.inst.descriptor); err != nil {
|
||||
|
@ -36,10 +36,10 @@ type (
|
||||
)
|
||||
|
||||
var (
|
||||
_ export.AggregationSelector = selectorInexpensive{}
|
||||
_ export.AggregationSelector = selectorSketch{}
|
||||
_ export.AggregationSelector = selectorExact{}
|
||||
_ export.AggregationSelector = selectorHistogram{}
|
||||
_ export.AggregatorSelector = selectorInexpensive{}
|
||||
_ export.AggregatorSelector = selectorSketch{}
|
||||
_ export.AggregatorSelector = selectorExact{}
|
||||
_ export.AggregatorSelector = selectorHistogram{}
|
||||
)
|
||||
|
||||
// NewWithInexpensiveDistribution returns a simple aggregation selector
|
||||
@ -47,7 +47,7 @@ var (
|
||||
// for the three kinds of metric. This selector is faster and uses
|
||||
// less memory than the others because minmaxsumcount does not
|
||||
// aggregate quantile information.
|
||||
func NewWithInexpensiveDistribution() export.AggregationSelector {
|
||||
func NewWithInexpensiveDistribution() export.AggregatorSelector {
|
||||
return selectorInexpensive{}
|
||||
}
|
||||
|
||||
@ -56,7 +56,7 @@ func NewWithInexpensiveDistribution() export.AggregationSelector {
|
||||
// kinds of metric. This selector uses more cpu and memory than the
|
||||
// NewWithInexpensiveDistribution because it uses one DDSketch per distinct
|
||||
// instrument and label set.
|
||||
func NewWithSketchDistribution(config *ddsketch.Config) export.AggregationSelector {
|
||||
func NewWithSketchDistribution(config *ddsketch.Config) export.AggregatorSelector {
|
||||
return selectorSketch{
|
||||
config: config,
|
||||
}
|
||||
@ -67,7 +67,7 @@ func NewWithSketchDistribution(config *ddsketch.Config) export.AggregationSelect
|
||||
// This selector uses more memory than the NewWithSketchDistribution
|
||||
// because it aggregates an array of all values, therefore is able to
|
||||
// compute exact quantiles.
|
||||
func NewWithExactDistribution() export.AggregationSelector {
|
||||
func NewWithExactDistribution() export.AggregatorSelector {
|
||||
return selectorExact{}
|
||||
}
|
||||
|
||||
@ -75,7 +75,7 @@ func NewWithExactDistribution() export.AggregationSelector {
|
||||
// histogram, and histogram aggregators for the three kinds of metric. This
|
||||
// selector uses more memory than the NewWithInexpensiveDistribution because it
|
||||
// uses a counter per bucket.
|
||||
func NewWithHistogramDistribution(boundaries []float64) export.AggregationSelector {
|
||||
func NewWithHistogramDistribution(boundaries []float64) export.AggregatorSelector {
|
||||
return selectorHistogram{boundaries: boundaries}
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ var (
|
||||
testValueObserverDesc = metric.NewDescriptor("valueobserver", metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
)
|
||||
|
||||
func oneAgg(sel export.AggregationSelector, desc *metric.Descriptor) export.Aggregator {
|
||||
func oneAgg(sel export.AggregatorSelector, desc *metric.Descriptor) export.Aggregator {
|
||||
var agg export.Aggregator
|
||||
sel.AggregatorFor(desc, &agg)
|
||||
return agg
|
||||
|
@ -58,7 +58,7 @@ type (
|
||||
impl testImpl
|
||||
T *testing.T
|
||||
|
||||
export.AggregationSelector
|
||||
export.AggregatorSelector
|
||||
|
||||
lock sync.Mutex
|
||||
lused map[string]bool
|
||||
@ -287,10 +287,10 @@ func stressTest(t *testing.T, impl testImpl) {
|
||||
ctx := context.Background()
|
||||
t.Parallel()
|
||||
fixture := &testFixture{
|
||||
T: t,
|
||||
impl: impl,
|
||||
lused: map[string]bool{},
|
||||
AggregationSelector: test.AggregationSelector(),
|
||||
T: t,
|
||||
impl: impl,
|
||||
lused: map[string]bool{},
|
||||
AggregatorSelector: test.AggregatorSelector(),
|
||||
}
|
||||
cc := concurrency()
|
||||
sdk := NewAccumulator(fixture)
|
||||
|
Loading…
Reference in New Issue
Block a user