1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-03 13:11:53 +02:00

Metric SDK renames to align with draft SDK spec (#710)

* Replace batcher w/ integrator; rename ungrouped to simple

* SDK -> Accumulator

* Cleamup

* Address feedback
This commit is contained in:
Joshua MacDonald 2020-05-11 10:23:06 -07:00 committed by GitHub
parent f0855b7d08
commit 64afb05e53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 165 additions and 164 deletions

View File

@ -38,9 +38,9 @@ var Must = metric.Must
// benchFixture is copied from sdk/metric/benchmark_test.go.
// TODO refactor to share this code.
type benchFixture struct {
sdk *sdk.SDK
meter metric.Meter
B *testing.B
accumulator *sdk.Accumulator
meter metric.Meter
B *testing.B
}
var _ metric.Provider = &benchFixture{}
@ -51,8 +51,8 @@ func newFixture(b *testing.B) *benchFixture {
B: b,
}
bf.sdk = sdk.New(bf)
bf.meter = metric.WrapMeterImpl(bf.sdk, "test")
bf.accumulator = sdk.NewAccumulator(bf)
bf.meter = metric.WrapMeterImpl(bf.accumulator, "test")
return bf
}

View File

@ -29,8 +29,8 @@ import (
"go.opentelemetry.io/otel/api/label"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)
@ -145,7 +145,7 @@ func InstallNewPipeline(config Config) (*push.Controller, http.HandlerFunc, erro
}
// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and batchers.
// chaining a NewRawExporter into the recommended selectors and integrators.
func NewExportPipeline(config Config, period time.Duration) (*push.Controller, http.HandlerFunc, error) {
selector := simple.NewWithHistogramMeasure(config.DefaultHistogramBoundaries)
exporter, err := NewRawExporter(config)
@ -153,7 +153,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
return nil, nil, err
}
// Prometheus needs to use a stateful batcher since counters (and histogram since they are a collection of Counters)
// Prometheus needs to use a stateful integrator since counters (and histogram since they are a collection of Counters)
// are cumulative (i.e., monotonically increasing values) and should not be resetted after each export.
//
// Prometheus uses this approach to be resilient to scrape failures.
@ -161,8 +161,8 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
batcher := ungrouped.New(selector, true)
pusher := push.New(batcher, exporter, period)
integrator := integrator.New(selector, true)
pusher := push.New(integrator, exporter, period)
pusher.Start()
return pusher, exporter.ServeHTTP, nil

View File

@ -29,8 +29,8 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
@ -131,15 +131,15 @@ func InstallNewPipeline(config Config, opts ...push.Option) (*push.Controller, e
}
// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and batchers.
// chaining a NewRawExporter into the recommended selectors and integrators.
func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) (*push.Controller, error) {
selector := simple.NewWithExactMeasure()
exporter, err := NewRawExporter(config)
if err != nil {
return nil, err
}
batcher := ungrouped.New(selector, true)
pusher := push.New(batcher, exporter, period, opts...)
integrator := integrator.New(selector, true)
pusher := push.New(integrator, exporter, period, opts...)
pusher.Start()
return pusher, nil

View File

@ -32,8 +32,8 @@ import (
metricapi "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/otlp"
exporttrace "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
@ -111,8 +111,8 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
}
selector := simple.NewWithExactMeasure()
batcher := ungrouped.New(selector, true)
pusher := push.New(batcher, exp, 60*time.Second)
integrator := integrator.New(selector, true)
pusher := push.New(integrator, exp, 60*time.Second)
pusher.Start()
ctx := context.Background()

View File

@ -22,7 +22,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
)
// Batcher is responsible for deciding which kind of aggregation to
// Integrator is responsible for deciding which kind of aggregation to
// use (via AggregationSelector), gathering exported results from the
// SDK during collection, and deciding over which dimensions to group
// the exported data.
@ -37,13 +37,13 @@ import (
//
// The `Process` method is called during collection in a
// single-threaded context from the SDK, after the aggregator is
// checkpointed, allowing the batcher to build the set of metrics
// checkpointed, allowing the integrator to build the set of metrics
// currently being exported.
//
// The `CheckpointSet` method is called during collection in a
// single-threaded context from the Exporter, giving the exporter
// access to a producer for iterating over the complete checkpoint.
type Batcher interface {
type Integrator interface {
// AggregationSelector is responsible for selecting the
// concrete type of Aggregator used for a metric in the SDK.
//
@ -77,8 +77,8 @@ type Batcher interface {
// The returned CheckpointSet is passed to the Exporter.
CheckpointSet() CheckpointSet
// FinishedCollection informs the Batcher that a complete
// collection round was completed. Stateless batchers might
// FinishedCollection informs the Integrator that a complete
// collection round was completed. Stateless integrators might
// reset state in this method, for example.
FinishedCollection()
}
@ -163,13 +163,13 @@ type Exporter interface {
// The Resource contains common attributes that apply to all
// metric events in the SDK.
//
// The CheckpointSet interface refers to the Batcher that just
// The CheckpointSet interface refers to the Integrator that just
// completed collection.
Export(context.Context, *resource.Resource, CheckpointSet) error
}
// CheckpointSet allows a controller to access a complete checkpoint of
// aggregated metrics from the Batcher. This is passed to the
// aggregated metrics from the Integrator. This is passed to the
// Exporter which may then use ForEach to iterate over the collection
// of aggregated metrics.
type CheckpointSet interface {
@ -192,7 +192,7 @@ type Record struct {
aggregator Aggregator
}
// NewRecord allows Batcher implementations to construct export
// NewRecord allows Integrator implementations to construct export
// records. The Descriptor, Labels, and Aggregator represent
// aggregate metric events received over a single collection period.
func NewRecord(descriptor *metric.Descriptor, labels *label.Set, aggregator Aggregator) Record {

View File

@ -36,10 +36,10 @@ import (
type processFunc func(context.Context, export.Record) error
type benchFixture struct {
meter metric.MeterMust
sdk *sdk.SDK
B *testing.B
pcb processFunc
meter metric.MeterMust
accumulator *sdk.Accumulator
B *testing.B
pcb processFunc
}
func newFixture(b *testing.B) *benchFixture {
@ -48,8 +48,8 @@ func newFixture(b *testing.B) *benchFixture {
B: b,
}
bf.sdk = sdk.New(bf)
bf.meter = metric.Must(metric.WrapMeterImpl(bf.sdk, "benchmarks"))
bf.accumulator = sdk.NewAccumulator(bf)
bf.meter = metric.Must(metric.WrapMeterImpl(bf.accumulator, "benchmarks"))
return bf
}
@ -223,7 +223,7 @@ func benchmarkIterator(b *testing.B, n int) {
cnt.Add(ctx, 1, makeLabels(n)...)
b.ResetTimer()
fix.sdk.Collect(ctx)
fix.accumulator.Collect(ctx)
}
func BenchmarkIterator_0(b *testing.B) {
@ -447,7 +447,7 @@ func BenchmarkObserverObservationInt64(b *testing.B) {
b.ResetTimer()
fix.sdk.Collect(ctx)
fix.accumulator.Collect(ctx)
}
func BenchmarkObserverObservationFloat64(b *testing.B) {
@ -462,7 +462,7 @@ func BenchmarkObserverObservationFloat64(b *testing.B) {
b.ResetTimer()
fix.sdk.Collect(ctx)
fix.accumulator.Collect(ctx)
}
// MaxSumCount
@ -536,7 +536,7 @@ func benchmarkBatchRecord8Labels(b *testing.B, numInst int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
fix.sdk.RecordBatch(ctx, labs, meas...)
fix.accumulator.RecordBatch(ctx, labs, meas...)
}
}
@ -574,7 +574,7 @@ func BenchmarkRepeatedDirectCalls(b *testing.B) {
for i := 0; i < b.N; i++ {
c.Add(ctx, 1, k)
fix.sdk.Collect(ctx)
fix.accumulator.Collect(ctx)
}
}
@ -595,7 +595,7 @@ func BenchmarkLabelIterator(b *testing.B) {
counter := fix.meter.NewInt64Counter("test.counter")
counter.Add(ctx, 1, keyValues...)
fix.sdk.Collect(ctx)
fix.accumulator.Collect(ctx)
b.ResetTimer()

View File

@ -30,12 +30,12 @@ import (
type Controller struct {
lock sync.Mutex
collectLock sync.Mutex
sdk *sdk.SDK
accumulator *sdk.Accumulator
resource *resource.Resource
uniq metric.MeterImpl
named map[string]metric.Meter
errorHandler sdk.ErrorHandler
batcher export.Batcher
integrator export.Integrator
exporter export.Exporter
wg sync.WaitGroup
ch chan struct{}
@ -70,23 +70,23 @@ var _ Clock = realClock{}
var _ Ticker = realTicker{}
// New constructs a Controller, an implementation of metric.Provider,
// using the provided batcher, exporter, collection period, and SDK
// using the provided integrator, exporter, collection period, and SDK
// configuration options to configure an SDK with periodic collection.
// The batcher itself is configured with the aggregation selector policy.
func New(batcher export.Batcher, exporter export.Exporter, period time.Duration, opts ...Option) *Controller {
// The integrator itself is configured with the aggregation selector policy.
func New(integrator export.Integrator, exporter export.Exporter, period time.Duration, opts ...Option) *Controller {
c := &Config{ErrorHandler: sdk.DefaultErrorHandler}
for _, opt := range opts {
opt.Apply(c)
}
impl := sdk.New(batcher, sdk.WithErrorHandler(c.ErrorHandler))
impl := sdk.NewAccumulator(integrator, sdk.WithErrorHandler(c.ErrorHandler))
return &Controller{
sdk: impl,
accumulator: impl,
resource: c.Resource,
uniq: registry.NewUniqueInstrumentMeterImpl(impl),
named: map[string]metric.Meter{},
errorHandler: c.ErrorHandler,
batcher: batcher,
integrator: integrator,
exporter: exporter,
ch: make(chan struct{}),
period: period,
@ -106,7 +106,7 @@ func (c *Controller) SetErrorHandler(errorHandler sdk.ErrorHandler) {
c.lock.Lock()
defer c.lock.Unlock()
c.errorHandler = errorHandler
c.sdk.SetErrorHandler(errorHandler)
c.accumulator.SetErrorHandler(errorHandler)
}
// Meter returns a named Meter, satisifying the metric.Provider
@ -176,10 +176,10 @@ func (c *Controller) tick() {
c.collect(ctx)
checkpointSet := syncCheckpointSet{
mtx: &c.collectLock,
delegate: c.batcher.CheckpointSet(),
delegate: c.integrator.CheckpointSet(),
}
err := c.exporter.Export(ctx, c.resource, checkpointSet)
c.batcher.FinishedCollection()
c.integrator.FinishedCollection()
if err != nil {
c.errorHandler(err)
@ -190,7 +190,7 @@ func (c *Controller) collect(ctx context.Context) {
c.collectLock.Lock()
defer c.collectLock.Unlock()
c.sdk.Collect(ctx)
c.accumulator.Collect(ctx)
}
// syncCheckpointSet is a wrapper for a CheckpointSet to synchronize

View File

@ -34,7 +34,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
)
type testBatcher struct {
type testIntegrator struct {
t *testing.T
lock sync.Mutex
checkpointSet *test.CheckpointSet
@ -52,7 +52,7 @@ type testExporter struct {
type testFixture struct {
checkpointSet *test.CheckpointSet
batcher *testBatcher
integrator *testIntegrator
exporter *testExporter
}
@ -70,7 +70,7 @@ var _ push.Ticker = mockTicker{}
func newFixture(t *testing.T) testFixture {
checkpointSet := test.NewCheckpointSet()
batcher := &testBatcher{
integrator := &testIntegrator{
t: t,
checkpointSet: checkpointSet,
}
@ -79,29 +79,29 @@ func newFixture(t *testing.T) testFixture {
}
return testFixture{
checkpointSet: checkpointSet,
batcher: batcher,
integrator: integrator,
exporter: exporter,
}
}
func (b *testBatcher) AggregatorFor(*metric.Descriptor) export.Aggregator {
func (b *testIntegrator) AggregatorFor(*metric.Descriptor) export.Aggregator {
return sum.New()
}
func (b *testBatcher) CheckpointSet() export.CheckpointSet {
func (b *testIntegrator) CheckpointSet() export.CheckpointSet {
b.lock.Lock()
defer b.lock.Unlock()
b.checkpoints++
return b.checkpointSet
}
func (b *testBatcher) FinishedCollection() {
func (b *testIntegrator) FinishedCollection() {
b.lock.Lock()
defer b.lock.Unlock()
b.finishes++
}
func (b *testBatcher) Process(_ context.Context, record export.Record) error {
func (b *testIntegrator) Process(_ context.Context, record export.Record) error {
b.lock.Lock()
defer b.lock.Unlock()
labels := record.Labels().ToSlice()
@ -109,7 +109,7 @@ func (b *testBatcher) Process(_ context.Context, record export.Record) error {
return nil
}
func (b *testBatcher) getCounts() (checkpoints, finishes int) {
func (b *testIntegrator) getCounts() (checkpoints, finishes int) {
b.lock.Lock()
defer b.lock.Unlock()
return b.checkpoints, b.finishes
@ -165,7 +165,7 @@ func (t mockTicker) C() <-chan time.Time {
func TestPushDoubleStop(t *testing.T) {
fix := newFixture(t)
p := push.New(fix.batcher, fix.exporter, time.Second)
p := push.New(fix.integrator, fix.exporter, time.Second)
p.Start()
p.Stop()
p.Stop()
@ -173,7 +173,7 @@ func TestPushDoubleStop(t *testing.T) {
func TestPushDoubleStart(t *testing.T) {
fix := newFixture(t)
p := push.New(fix.batcher, fix.exporter, time.Second)
p := push.New(fix.integrator, fix.exporter, time.Second)
p.Start()
p.Start()
p.Stop()
@ -182,7 +182,7 @@ func TestPushDoubleStart(t *testing.T) {
func TestPushTicker(t *testing.T) {
fix := newFixture(t)
p := push.New(fix.batcher, fix.exporter, time.Second)
p := push.New(fix.integrator, fix.exporter, time.Second)
meter := p.Meter("name")
mock := mockClock{clock.NewMock()}
@ -197,7 +197,7 @@ func TestPushTicker(t *testing.T) {
counter.Add(ctx, 3)
records, exports := fix.exporter.resetRecords()
checkpoints, finishes := fix.batcher.getCounts()
checkpoints, finishes := fix.integrator.getCounts()
require.Equal(t, 0, checkpoints)
require.Equal(t, 0, finishes)
require.Equal(t, 0, exports)
@ -207,7 +207,7 @@ func TestPushTicker(t *testing.T) {
runtime.Gosched()
records, exports = fix.exporter.resetRecords()
checkpoints, finishes = fix.batcher.getCounts()
checkpoints, finishes = fix.integrator.getCounts()
require.Equal(t, 1, checkpoints)
require.Equal(t, 1, finishes)
require.Equal(t, 1, exports)
@ -226,7 +226,7 @@ func TestPushTicker(t *testing.T) {
runtime.Gosched()
records, exports = fix.exporter.resetRecords()
checkpoints, finishes = fix.batcher.getCounts()
checkpoints, finishes = fix.integrator.getCounts()
require.Equal(t, 2, checkpoints)
require.Equal(t, 2, finishes)
require.Equal(t, 2, exports)
@ -265,7 +265,7 @@ func TestPushExportError(t *testing.T) {
fix := newFixture(t)
fix.exporter.injectErr = injector("counter1", tt.injectedError)
p := push.New(fix.batcher, fix.exporter, time.Second)
p := push.New(fix.integrator, fix.exporter, time.Second)
var err error
var lock sync.Mutex
@ -297,7 +297,7 @@ func TestPushExportError(t *testing.T) {
runtime.Gosched()
records, exports := fix.exporter.resetRecords()
checkpoints, finishes := fix.batcher.getCounts()
checkpoints, finishes := fix.integrator.getCounts()
require.Equal(t, 1, exports)
require.Equal(t, 1, checkpoints)
require.Equal(t, 1, finishes)

View File

@ -33,12 +33,12 @@ import (
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
batchTest "go.opentelemetry.io/otel/sdk/metric/batcher/test"
batchTest "go.opentelemetry.io/otel/sdk/metric/integrator/test"
)
var Must = metric.Must
type correctnessBatcher struct {
type correctnessIntegrator struct {
newAggCount int64
t *testing.T
@ -46,7 +46,7 @@ type correctnessBatcher struct {
records []export.Record
}
func (cb *correctnessBatcher) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
func (cb *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
name := descriptor.Name()
switch {
@ -63,25 +63,25 @@ func (cb *correctnessBatcher) AggregatorFor(descriptor *metric.Descriptor) (agg
return
}
func (cb *correctnessBatcher) CheckpointSet() export.CheckpointSet {
func (cb *correctnessIntegrator) CheckpointSet() export.CheckpointSet {
cb.t.Fatal("Should not be called")
return nil
}
func (*correctnessBatcher) FinishedCollection() {
func (*correctnessIntegrator) FinishedCollection() {
}
func (cb *correctnessBatcher) Process(_ context.Context, record export.Record) error {
func (cb *correctnessIntegrator) Process(_ context.Context, record export.Record) error {
cb.records = append(cb.records, record)
return nil
}
func TestInputRangeTestCounter(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.New(batcher)
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
var sdkErr error
@ -98,10 +98,10 @@ func TestInputRangeTestCounter(t *testing.T) {
checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed)
batcher.records = nil
integrator.records = nil
counter.Add(ctx, 1)
checkpointed = sdk.Collect(ctx)
sum, err := batcher.records[0].Aggregator().(aggregator.Sum).Sum()
sum, err := integrator.records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
@ -110,10 +110,10 @@ func TestInputRangeTestCounter(t *testing.T) {
func TestInputRangeTestMeasure(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.New(batcher)
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
var sdkErr error
@ -133,10 +133,10 @@ func TestInputRangeTestMeasure(t *testing.T) {
measure.Record(ctx, 1)
measure.Record(ctx, 2)
batcher.records = nil
integrator.records = nil
checkpointed = sdk.Collect(ctx)
count, err := batcher.records[0].Aggregator().(aggregator.Distribution).Count()
count, err := integrator.records[0].Aggregator().(aggregator.Distribution).Count()
require.Equal(t, int64(2), count)
require.Equal(t, 1, checkpointed)
require.Nil(t, sdkErr)
@ -145,10 +145,10 @@ func TestInputRangeTestMeasure(t *testing.T) {
func TestDisabledInstrument(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.New(batcher)
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
measure := Must(meter).NewFloat64Measure("name.disabled")
@ -157,16 +157,16 @@ func TestDisabledInstrument(t *testing.T) {
checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed)
require.Equal(t, 0, len(batcher.records))
require.Equal(t, 0, len(integrator.records))
}
func TestRecordNaN(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.New(batcher)
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
var sdkErr error
@ -182,10 +182,10 @@ func TestRecordNaN(t *testing.T) {
func TestSDKLabelsDeduplication(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.New(batcher)
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
counter := Must(meter).NewInt64Counter("counter")
@ -238,7 +238,7 @@ func TestSDKLabelsDeduplication(t *testing.T) {
sdk.Collect(ctx)
var actual [][]core.KeyValue
for _, rec := range batcher.records {
for _, rec := range integrator.records {
sum, _ := rec.Aggregator().(aggregator.Sum).Sum()
require.Equal(t, sum, metric.NewInt64Number(2))
@ -285,11 +285,11 @@ func TestDefaultLabelEncoder(t *testing.T) {
func TestObserverCollection(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.New(batcher)
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
_ = Must(meter).RegisterFloat64Observer("float.observer", func(result metric.Float64ObserverResult) {
@ -311,10 +311,10 @@ func TestObserverCollection(t *testing.T) {
collected := sdk.Collect(ctx)
require.Equal(t, 4, collected)
require.Equal(t, 4, len(batcher.records))
require.Equal(t, 4, len(integrator.records))
out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range batcher.records {
for _, rec := range integrator.records {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
@ -327,11 +327,11 @@ func TestObserverCollection(t *testing.T) {
func TestRecordBatch(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.New(batcher)
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
counter1 := Must(meter).NewInt64Counter("int64.counter")
@ -354,7 +354,7 @@ func TestRecordBatch(t *testing.T) {
sdk.Collect(ctx)
out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range batcher.records {
for _, rec := range integrator.records {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
@ -370,11 +370,11 @@ func TestRecordBatch(t *testing.T) {
// that its encoded labels will be cached across collection intervals.
func TestRecordPersistence(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.New(batcher)
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
c := Must(meter).NewFloat64Counter("sum.name")
@ -387,5 +387,5 @@ func TestRecordPersistence(t *testing.T) {
sdk.Collect(ctx)
}
require.Equal(t, int64(2), batcher.newAggCount)
require.Equal(t, int64(2), integrator.newAggCount)
}

View File

@ -124,22 +124,22 @@ Aggregators implement a Merge method, also called in collection
context, that combines state from two aggregators into one. Each SDK
record has an associated aggregator.
Batcher is an interface which sits between the SDK and an exporter.
The Batcher embeds an AggregationSelector, used by the SDK to assign
new Aggregators. The Batcher supports a Process() API for submitting
checkpointed aggregators to the batcher, and a CheckpointSet() API
Integrator is an interface which sits between the SDK and an exporter.
The Integrator embeds an AggregationSelector, used by the SDK to assign
new Aggregators. The Integrator supports a Process() API for submitting
checkpointed aggregators to the integrator, and a CheckpointSet() API
for producing a complete checkpoint for the exporter. Two default
Batcher implementations are provided, the "defaultkeys" Batcher groups
Integrator implementations are provided, the "defaultkeys" Integrator groups
aggregate metrics by their recommended Descriptor.Keys(), the
"ungrouped" Batcher aggregates metrics at full dimensionality.
"simple" Integrator aggregates metrics at full dimensionality.
LabelEncoder is an optional optimization that allows an exporter to
provide the serialization logic for labels. This allows avoiding
duplicate serialization of labels, once as a unique key in the SDK (or
Batcher) and once in the exporter.
Integrator) and once in the exporter.
CheckpointSet is an interface between the Batcher and the Exporter.
After completing a collection pass, the Batcher.CheckpointSet() method
CheckpointSet is an interface between the Integrator and the Exporter.
After completing a collection pass, the Integrator.CheckpointSet() method
returns a CheckpointSet, which the Exporter uses to iterate over all
the updated metrics.

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package ungrouped // import "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
package simple // import "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
import (
"context"
@ -25,7 +25,7 @@ import (
)
type (
Batcher struct {
Integrator struct {
selector export.AggregationSelector
batchMap batchMap
stateful bool
@ -44,22 +44,22 @@ type (
batchMap map[batchKey]batchValue
)
var _ export.Batcher = &Batcher{}
var _ export.Integrator = &Integrator{}
var _ export.CheckpointSet = batchMap{}
func New(selector export.AggregationSelector, stateful bool) *Batcher {
return &Batcher{
func New(selector export.AggregationSelector, stateful bool) *Integrator {
return &Integrator{
selector: selector,
batchMap: batchMap{},
stateful: stateful,
}
}
func (b *Batcher) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
func (b *Integrator) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
return b.selector.AggregatorFor(descriptor)
}
func (b *Batcher) Process(_ context.Context, record export.Record) error {
func (b *Integrator) Process(_ context.Context, record export.Record) error {
desc := record.Descriptor()
key := batchKey{
descriptor: desc,
@ -70,12 +70,12 @@ func (b *Batcher) Process(_ context.Context, record export.Record) error {
if ok {
// Note: The call to Merge here combines only
// identical records. It is required even for a
// stateless Batcher because such identical records
// stateless Integrator because such identical records
// may arise in the Meter implementation due to race
// conditions.
return value.aggregator.Merge(agg, desc)
}
// If this Batcher is stateful, create a copy of the
// If this integrator is stateful, create a copy of the
// Aggregator for long-term storage. Otherwise the
// Meter implementation will checkpoint the aggregator
// again, overwriting the long-lived state.
@ -95,11 +95,11 @@ func (b *Batcher) Process(_ context.Context, record export.Record) error {
return nil
}
func (b *Batcher) CheckpointSet() export.CheckpointSet {
func (b *Integrator) CheckpointSet() export.CheckpointSet {
return b.batchMap
}
func (b *Batcher) FinishedCollection() {
func (b *Integrator) FinishedCollection() {
if !b.stateful {
b.batchMap = batchMap{}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package ungrouped_test
package simple_test
import (
"context"
@ -23,15 +23,15 @@ import (
"github.com/stretchr/testify/require"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/test"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
)
// These tests use the ../test label encoding.
func TestUngroupedStateless(t *testing.T) {
ctx := context.Background()
b := ungrouped.New(test.NewAggregationSelector(), false)
b := simple.New(test.NewAggregationSelector(), false)
// Set initial lastValue values
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
@ -93,7 +93,7 @@ func TestUngroupedStateless(t *testing.T) {
func TestUngroupedStateful(t *testing.T) {
ctx := context.Background()
b := ungrouped.New(test.NewAggregationSelector(), true)
b := simple.New(test.NewAggregationSelector(), true)
counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)
caggA := counterA.Aggregator()
@ -129,7 +129,7 @@ func TestUngroupedStateful(t *testing.T) {
caggA.Checkpoint(ctx, &test.CounterADesc)
caggB.Checkpoint(ctx, &test.CounterBDesc)
// As yet cagg has not been passed to Batcher.Process. Should
// As yet cagg has not been passed to Integrator.Process. Should
// not see an update.
checkpointSet = b.CheckpointSet()
b.FinishedCollection()

View File

@ -41,7 +41,7 @@ type (
// testAggregationSelector returns aggregators consistent with
// the test variables below, needed for testing stateful
// batchers, which clone Aggregators using AggregatorFor(desc).
// integrators, which clone Aggregators using AggregatorFor(desc).
testAggregationSelector struct{}
)

View File

@ -31,15 +31,16 @@ import (
)
type (
// SDK implements the OpenTelemetry Meter API. The SDK is
// bound to a single export.Batcher in `New()`.
// Accumulator implements the OpenTelemetry Meter API. The
// Accumulator is bound to a single export.Integrator in
// `NewAccumulator()`.
//
// The SDK supports a Collect() API to gather and export
// The Accumulator supports a Collect() API to gather and export
// current data. Collect() should be arranged according to
// the batcher model. Push-based batchers will setup a
// timer to call Collect() periodically. Pull-based batchers
// the integrator model. Push-based integrators will setup a
// timer to call Collect() periodically. Pull-based integrators
// will call Collect() when a pull request arrives.
SDK struct {
Accumulator struct {
// current maps `mapkey` to *record.
current sync.Map
@ -51,8 +52,8 @@ type (
// incremented in `Collect()`.
currentEpoch int64
// batcher is the configured batcher+configuration.
batcher export.Batcher
// integrator is the configured integrator+configuration.
integrator export.Integrator
// collectLock prevents simultaneous calls to Collect().
collectLock sync.Mutex
@ -80,10 +81,10 @@ type (
// record maintains the state of one metric instrument. Due
// the use of lock-free algorithms, there may be more than one
// `record` in existence at a time, although at most one can
// be referenced from the `SDK.current` map.
// be referenced from the `Accumulator.current` map.
record struct {
// refMapped keeps track of refcounts and the mapping state to the
// SDK.current map.
// Accumulator.current map.
refMapped refcountMapped
// updateCount is incremented on every Update.
@ -119,7 +120,7 @@ type (
}
instrument struct {
meter *SDK
meter *Accumulator
descriptor metric.Descriptor
}
@ -142,7 +143,7 @@ type (
)
var (
_ api.MeterImpl = &SDK{}
_ api.MeterImpl = &Accumulator{}
_ api.AsyncImpl = &asyncInstrument{}
_ api.SyncImpl = &syncInstrument{}
_ api.BoundSyncImpl = &record{}
@ -188,14 +189,14 @@ func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {
if lrec.observedEpoch == a.meter.currentEpoch {
// last value wins for Observers, so if we see the same labels
// in the current epoch, we replace the old recorder
lrec.recorder = a.meter.batcher.AggregatorFor(&a.descriptor)
lrec.recorder = a.meter.integrator.AggregatorFor(&a.descriptor)
} else {
lrec.observedEpoch = a.meter.currentEpoch
}
a.recorders[labels.Equivalent()] = lrec
return lrec.recorder
}
rec := a.meter.batcher.AggregatorFor(&a.descriptor)
rec := a.meter.integrator.AggregatorFor(&a.descriptor)
if a.recorders == nil {
a.recorders = make(map[label.Distinct]*labeledRecorder)
}
@ -210,7 +211,7 @@ func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {
return rec
}
func (m *SDK) SetErrorHandler(f ErrorHandler) {
func (m *Accumulator) SetErrorHandler(f ErrorHandler) {
m.errorHandler = f
}
@ -259,7 +260,7 @@ func (s *syncInstrument) acquireHandle(kvs []core.KeyValue, labelPtr *label.Set)
}
rec.refMapped = refcountMapped{value: 2}
rec.inst = s
rec.recorder = s.meter.batcher.AggregatorFor(&s.descriptor)
rec.recorder = s.meter.integrator.AggregatorFor(&s.descriptor)
for {
// Load/Store: there's a memory allocation to place `mk` into
@ -301,32 +302,32 @@ func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs [
h.RecordOne(ctx, number)
}
// New constructs a new SDK for the given batcher. This SDK supports
// only a single batcher.
// NewAccumulator constructs a new Accumulator for the given
// integrator. This Accumulator supports only a single integrator.
//
// The SDK does not start any background process to collect itself
// periodically, this responsbility lies with the batcher, typically,
// The Accumulator does not start any background process to collect itself
// periodically, this responsbility lies with the integrator, typically,
// depending on the type of export. For example, a pull-based
// batcher will call Collect() when it receives a request to scrape
// current metric values. A push-based batcher should configure its
// integrator will call Collect() when it receives a request to scrape
// current metric values. A push-based integrator should configure its
// own periodic collection.
func New(batcher export.Batcher, opts ...Option) *SDK {
func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator {
c := &Config{ErrorHandler: DefaultErrorHandler}
for _, opt := range opts {
opt.Apply(c)
}
return &SDK{
batcher: batcher,
return &Accumulator{
integrator: integrator,
errorHandler: c.ErrorHandler,
}
}
func DefaultErrorHandler(err error) {
fmt.Fprintln(os.Stderr, "Metrics SDK error:", err)
fmt.Fprintln(os.Stderr, "Metrics Accumulator error:", err)
}
func (m *SDK) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) {
func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) {
return &syncInstrument{
instrument: instrument{
descriptor: descriptor,
@ -335,7 +336,7 @@ func (m *SDK) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error)
}, nil
}
func (m *SDK) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(api.Number, []core.KeyValue))) (api.AsyncImpl, error) {
func (m *Accumulator) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(api.Number, []core.KeyValue))) (api.AsyncImpl, error) {
a := &asyncInstrument{
instrument: instrument{
descriptor: descriptor,
@ -351,11 +352,11 @@ func (m *SDK) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(a
// exports data for each active instrument. Collect() may not be
// called concurrently.
//
// During the collection pass, the export.Batcher will receive
// During the collection pass, the export.Integrator will receive
// one Export() call per current aggregation.
//
// Returns the number of records that were checkpointed.
func (m *SDK) Collect(ctx context.Context) int {
func (m *Accumulator) Collect(ctx context.Context) int {
m.collectLock.Lock()
defer m.collectLock.Unlock()
@ -365,7 +366,7 @@ func (m *SDK) Collect(ctx context.Context) int {
return checkpointed
}
func (m *SDK) collectRecords(ctx context.Context) int {
func (m *Accumulator) collectRecords(ctx context.Context) int {
checkpointed := 0
m.current.Range(func(key interface{}, value interface{}) bool {
@ -408,7 +409,7 @@ func (m *SDK) collectRecords(ctx context.Context) int {
return checkpointed
}
func (m *SDK) collectAsync(ctx context.Context) int {
func (m *Accumulator) collectAsync(ctx context.Context) int {
checkpointed := 0
m.asyncInstruments.Range(func(key, value interface{}) bool {
@ -421,11 +422,11 @@ func (m *SDK) collectAsync(ctx context.Context) int {
return checkpointed
}
func (m *SDK) checkpointRecord(ctx context.Context, r *record) int {
func (m *Accumulator) checkpointRecord(ctx context.Context, r *record) int {
return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels)
}
func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
func (m *Accumulator) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
if len(a.recorders) == 0 {
return 0
}
@ -448,14 +449,14 @@ func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
return checkpointed
}
func (m *SDK) checkpoint(ctx context.Context, descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int {
func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int {
if recorder == nil {
return 0
}
recorder.Checkpoint(ctx, descriptor)
exportRecord := export.NewRecord(descriptor, labels, recorder)
err := m.batcher.Process(ctx, exportRecord)
err := m.integrator.Process(ctx, exportRecord)
if err != nil {
m.errorHandler(err)
}
@ -463,7 +464,7 @@ func (m *SDK) checkpoint(ctx context.Context, descriptor *metric.Descriptor, rec
}
// RecordBatch enters a batch of metric events.
func (m *SDK) RecordBatch(ctx context.Context, kvs []core.KeyValue, measurements ...api.Measurement) {
func (m *Accumulator) RecordBatch(ctx context.Context, kvs []core.KeyValue, measurements ...api.Measurement) {
// Labels will be computed the first time acquireHandle is
// called. Subsequent calls to acquireHandle will re-use the
// previously computed value instead of recomputing the

View File

@ -157,7 +157,7 @@ func (f *testFixture) someLabels() []core.KeyValue {
}
}
func (f *testFixture) startWorker(impl *SDK, meter api.Meter, wg *sync.WaitGroup, i int) {
func (f *testFixture) startWorker(impl *Accumulator, meter api.Meter, wg *sync.WaitGroup, i int) {
ctx := context.Background()
name := fmt.Sprint("test_", i)
instrument := f.impl.newInstrument(meter, name)
@ -307,7 +307,7 @@ func stressTest(t *testing.T, impl testImpl) {
lused: map[string]bool{},
}
cc := concurrency()
sdk := New(fixture)
sdk := NewAccumulator(fixture)
meter := metric.WrapMeterImpl(sdk, "stress_test")
fixture.wg.Add(cc + 1)