1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-07-17 01:12:45 +02:00

Create a export.Checkpointer API; refactor metric export pipeline test helpers (#1055)

* Add regexp filter in api/label, test

* Add regexp option to sdk.Config

* Return indistinct values only when keyRe != nil

* Filter in sdk

* Add an accumulator filter test

* SDK tests pass

* Precommit

* Undo set filters

* Backout related filter changes

* Add a new test

* Checkpoint

* Comments

* Comments in label.Set

* Lint

* Add Checkpointer

* Test refactor continues

* Refactor reducer test

* Checkpoint

* Update push_test

* Update pull controller

* Comment

* Remove pending PRs

* Remove exportertest pkg

* Revert basic changes

* Revert testing changes

* Restore processortest changes

* Precommit & comments

* Comments on pull semantics

* Comments

* Fix buggy test; incorrect expectation following error

* Finish this test

* Comments

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Joshua MacDonald
2020-08-13 13:12:32 -07:00
committed by GitHub
parent b7df5543e3
commit 1cdf4ee8e6
13 changed files with 500 additions and 207 deletions

View File

@ -31,6 +31,7 @@ import (
apitrace "go.opentelemetry.io/otel/api/trace" apitrace "go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/exporters/otlp" "go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace"
@ -64,7 +65,10 @@ func initProvider() (*otlp.Exporter, *push.Controller) {
handleErr(err, "failed to create trace provider") handleErr(err, "failed to create trace provider")
pusher := push.New( pusher := push.New(
simple.NewWithExactDistribution(), basic.New(
simple.NewWithExactDistribution(),
exp,
),
exp, exp,
push.WithPeriod(2*time.Second), push.WithPeriod(2*time.Second),
) )

View File

@ -29,6 +29,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/controller/pull" "go.opentelemetry.io/otel/sdk/metric/controller/pull"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple"
) )
@ -144,8 +145,11 @@ func (e *Exporter) SetController(config Config, options ...pull.Option) {
defer e.lock.Unlock() defer e.lock.Unlock()
e.controller = pull.New( e.controller = pull.New(
simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries), basic.New(
e, simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
e,
basic.WithMemory(true),
),
options..., options...,
) )
} }

View File

@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/trace" "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdktrace "go.opentelemetry.io/otel/sdk/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace"
) )
@ -62,7 +63,10 @@ func NewExportPipeline(exportOpts []Option, pushOpts []push.Option) (apitrace.Pr
} }
pusher := push.New( pusher := push.New(
simple.NewWithExactDistribution(), basic.New(
simple.NewWithExactDistribution(),
exporter,
),
exporter, exporter,
pushOpts..., pushOpts...,
) )

View File

@ -97,6 +97,33 @@ type AggregatorSelector interface {
AggregatorFor(*metric.Descriptor, ...*Aggregator) AggregatorFor(*metric.Descriptor, ...*Aggregator)
} }
// Checkpointer is the interface used by a Controller to coordinate
// the Processor with Accumulator(s) and Exporter(s). The
// StartCollection() and FinishCollection() methods start and finish a
// collection interval. Controllers call the Accumulator(s) during
// collection to process Accumulations.
type Checkpointer interface {
// Processor processes metric data for export. The Process
// method is bracketed by StartCollection and FinishCollection
// calls. The embedded AggregatorSelector can be called at
// any time.
Processor
// CheckpointSet returns the current data set. This may be
// called before and after collection. The
// implementation is required to return the same value
// throughout its lifetime, since CheckpointSet exposes a
// sync.Locker interface. The caller is responsible for
// locking the CheckpointSet before initiating collection.
CheckpointSet() CheckpointSet
// StartCollection begins a collection interval.
StartCollection()
// FinishCollection ends a collection interval.
FinishCollection() error
}
// Aggregator implements a specific aggregation behavior, e.g., a // Aggregator implements a specific aggregation behavior, e.g., a
// behavior to track a sequence of updates to an instrument. Sum-only // behavior to track a sequence of updates to an instrument. Sum-only
// instruments commonly use a simple Sum aggregator, but for the // instruments commonly use a simple Sum aggregator, but for the

View File

@ -35,6 +35,7 @@ type mapkey struct {
} }
// CheckpointSet is useful for testing Exporters. // CheckpointSet is useful for testing Exporters.
// TODO(#872): Uses of this can be replaced by processortest.Output.
type CheckpointSet struct { type CheckpointSet struct {
sync.RWMutex sync.RWMutex
records map[mapkey]export.Record records map[mapkey]export.Record

View File

@ -23,7 +23,6 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric" sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
@ -35,17 +34,23 @@ const DefaultCachePeriod time.Duration = 10 * time.Second
// *basic.Processor. Use Provider() for obtaining Meters. Use // *basic.Processor. Use Provider() for obtaining Meters. Use
// Foreach() for accessing current records. // Foreach() for accessing current records.
type Controller struct { type Controller struct {
accumulator *sdk.Accumulator accumulator *sdk.Accumulator
processor *processor.Processor checkpointer export.Checkpointer
provider *registry.Provider provider *registry.Provider
period time.Duration period time.Duration
lastCollect time.Time lastCollect time.Time
clock controllerTime.Clock clock controllerTime.Clock
checkpoint export.CheckpointSet checkpoint export.CheckpointSet
} }
// New returns a *Controller configured with an aggregation selector and options. // New returns a *Controller configured with an export.Checkpointer.
func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector, options ...Option) *Controller { //
// Pull controllers are typically used in an environment where there
// are multiple readers. It is common, therefore, when configuring a
// basic Processor for use with this controller, to use a
// CumulativeExport strategy and the basic.WithMemory(true) option,
// which ensures that every CheckpointSet includes full state.
func New(checkpointer export.Checkpointer, options ...Option) *Controller {
config := &Config{ config := &Config{
Resource: resource.Empty(), Resource: resource.Empty(),
CachePeriod: DefaultCachePeriod, CachePeriod: DefaultCachePeriod,
@ -53,27 +58,24 @@ func New(aselector export.AggregatorSelector, eselector export.ExportKindSelecto
for _, opt := range options { for _, opt := range options {
opt.Apply(config) opt.Apply(config)
} }
// This controller uses WithMemory() as a requirement to
// support multiple readers.
processor := processor.New(aselector, eselector, processor.WithMemory(true))
accum := sdk.NewAccumulator( accum := sdk.NewAccumulator(
processor, checkpointer,
sdk.WithResource(config.Resource), sdk.WithResource(config.Resource),
) )
return &Controller{ return &Controller{
accumulator: accum, accumulator: accum,
processor: processor, checkpointer: checkpointer,
provider: registry.NewProvider(accum), provider: registry.NewProvider(accum),
period: config.CachePeriod, period: config.CachePeriod,
checkpoint: processor.CheckpointSet(), checkpoint: checkpointer.CheckpointSet(),
clock: controllerTime.RealClock{}, clock: controllerTime.RealClock{},
} }
} }
// SetClock sets the clock used for caching. For testing purposes. // SetClock sets the clock used for caching. For testing purposes.
func (c *Controller) SetClock(clock controllerTime.Clock) { func (c *Controller) SetClock(clock controllerTime.Clock) {
c.processor.Lock() c.checkpointer.CheckpointSet().Lock()
defer c.processor.Unlock() defer c.checkpointer.CheckpointSet().Unlock()
c.clock = clock c.clock = clock
} }
@ -86,8 +88,8 @@ func (c *Controller) Provider() metric.Provider {
// Foreach gives the caller read-locked access to the current // Foreach gives the caller read-locked access to the current
// export.CheckpointSet. // export.CheckpointSet.
func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) error) error { func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) error) error {
c.processor.RLock() c.checkpointer.CheckpointSet().RLock()
defer c.processor.RUnlock() defer c.checkpointer.CheckpointSet().RUnlock()
return c.checkpoint.ForEach(ks, f) return c.checkpoint.ForEach(ks, f)
} }
@ -95,8 +97,8 @@ func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record)
// Collect requests a collection. The collection will be skipped if // Collect requests a collection. The collection will be skipped if
// the last collection is aged less than the CachePeriod. // the last collection is aged less than the CachePeriod.
func (c *Controller) Collect(ctx context.Context) error { func (c *Controller) Collect(ctx context.Context) error {
c.processor.Lock() c.checkpointer.CheckpointSet().Lock()
defer c.processor.Unlock() defer c.checkpointer.CheckpointSet().Unlock()
if c.period > 0 { if c.period > 0 {
now := c.clock.Now() now := c.clock.Now()
@ -108,9 +110,9 @@ func (c *Controller) Collect(ctx context.Context) error {
c.lastCollect = now c.lastCollect = now
} }
c.processor.StartCollection() c.checkpointer.StartCollection()
c.accumulator.Collect(ctx) c.accumulator.Collect(ctx)
err := c.processor.FinishCollection() err := c.checkpointer.FinishCollection()
c.checkpoint = c.processor.CheckpointSet() c.checkpoint = c.checkpointer.CheckpointSet()
return err return err
} }

View File

@ -28,14 +28,18 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/controllertest" "go.opentelemetry.io/otel/sdk/metric/controller/controllertest"
"go.opentelemetry.io/otel/sdk/metric/controller/pull" "go.opentelemetry.io/otel/sdk/metric/controller/pull"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/processor/processortest" "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple" selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
) )
func TestPullNoCache(t *testing.T) { func TestPullNoCache(t *testing.T) {
puller := pull.New( puller := pull.New(
selector.NewWithExactDistribution(), basic.New(
export.CumulativeExporter, selector.NewWithExactDistribution(),
export.CumulativeExporter,
basic.WithMemory(true),
),
pull.WithCachePeriod(0), pull.WithCachePeriod(0),
) )
@ -66,8 +70,11 @@ func TestPullNoCache(t *testing.T) {
func TestPullWithCache(t *testing.T) { func TestPullWithCache(t *testing.T) {
puller := pull.New( puller := pull.New(
selector.NewWithExactDistribution(), basic.New(
export.CumulativeExporter, selector.NewWithExactDistribution(),
export.CumulativeExporter,
basic.WithMemory(true),
),
pull.WithCachePeriod(time.Second), pull.WithCachePeriod(time.Second),
) )
mock := controllertest.NewMockClock() mock := controllertest.NewMockClock()

View File

@ -25,7 +25,6 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric" sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
) )
// DefaultPushPeriod is the default time interval between pushes. // DefaultPushPeriod is the default time interval between pushes.
@ -33,23 +32,23 @@ const DefaultPushPeriod = 10 * time.Second
// Controller organizes a periodic push of metric data. // Controller organizes a periodic push of metric data.
type Controller struct { type Controller struct {
lock sync.Mutex lock sync.Mutex
accumulator *sdk.Accumulator accumulator *sdk.Accumulator
provider *registry.Provider provider *registry.Provider
processor *basic.Processor checkpointer export.Checkpointer
exporter export.Exporter exporter export.Exporter
wg sync.WaitGroup wg sync.WaitGroup
ch chan struct{} ch chan struct{}
period time.Duration period time.Duration
timeout time.Duration timeout time.Duration
clock controllerTime.Clock clock controllerTime.Clock
ticker controllerTime.Ticker ticker controllerTime.Ticker
} }
// New constructs a Controller, an implementation of metric.Provider, // New constructs a Controller, an implementation of metric.Provider,
// using the provided exporter and options to configure an SDK with // using the provided checkpointer, exporter, and options to configure
// periodic collection. // an SDK with periodic collection.
func New(selector export.AggregatorSelector, exporter export.Exporter, opts ...Option) *Controller { func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Option) *Controller {
c := &Config{ c := &Config{
Period: DefaultPushPeriod, Period: DefaultPushPeriod,
} }
@ -60,20 +59,19 @@ func New(selector export.AggregatorSelector, exporter export.Exporter, opts ...O
c.Timeout = c.Period c.Timeout = c.Period
} }
processor := basic.New(selector, exporter)
impl := sdk.NewAccumulator( impl := sdk.NewAccumulator(
processor, checkpointer,
sdk.WithResource(c.Resource), sdk.WithResource(c.Resource),
) )
return &Controller{ return &Controller{
provider: registry.NewProvider(impl), provider: registry.NewProvider(impl),
accumulator: impl, accumulator: impl,
processor: processor, checkpointer: checkpointer,
exporter: exporter, exporter: exporter,
ch: make(chan struct{}), ch: make(chan struct{}),
period: c.Period, period: c.Period,
timeout: c.Timeout, timeout: c.Timeout,
clock: controllerTime.RealClock{}, clock: controllerTime.RealClock{},
} }
} }
@ -139,16 +137,17 @@ func (c *Controller) tick() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout) ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel() defer cancel()
c.processor.Lock() ckpt := c.checkpointer.CheckpointSet()
defer c.processor.Unlock() ckpt.Lock()
defer ckpt.Unlock()
c.processor.StartCollection() c.checkpointer.StartCollection()
c.accumulator.Collect(ctx) c.accumulator.Collect(ctx)
if err := c.processor.FinishCollection(); err != nil { if err := c.checkpointer.FinishCollection(); err != nil {
global.Handle(err) global.Handle(err)
} }
if err := c.exporter.Export(ctx, c.processor.CheckpointSet()); err != nil { if err := c.exporter.Export(ctx, ckpt); err != nil {
global.Handle(err) global.Handle(err)
} }
} }

View File

@ -30,10 +30,10 @@ import (
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/export/metric/metrictest"
"go.opentelemetry.io/otel/sdk/metric/controller/controllertest" "go.opentelemetry.io/otel/sdk/metric/controller/controllertest"
"go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/processor/processortest" "go.opentelemetry.io/otel/sdk/metric/processor/basic"
processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
@ -65,85 +65,46 @@ func init() {
global.SetErrorHandler(testHandler) global.SetErrorHandler(testHandler)
} }
type testExporter struct { func newExporter() *processorTest.Exporter {
t *testing.T return processorTest.NewExporter(
lock sync.Mutex export.PassThroughExporter,
exports int label.DefaultEncoder(),
records []export.Record )
injectErr func(r export.Record) error
} }
type testFixture struct { func newCheckpointer() export.Checkpointer {
checkpointSet *metrictest.CheckpointSet return processorTest.Checkpointer(
exporter *testExporter processorTest.NewProcessor(
} processorTest.AggregatorSelector(),
label.DefaultEncoder(),
func newFixture(t *testing.T) testFixture { ),
checkpointSet := metrictest.NewCheckpointSet(testResource) )
exporter := &testExporter{
t: t,
}
return testFixture{
checkpointSet: checkpointSet,
exporter: exporter,
}
}
func (e *testExporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind {
return export.PassThroughExporter
}
func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.lock.Lock()
defer e.lock.Unlock()
e.exports++
var records []export.Record
if err := checkpointSet.ForEach(e, func(r export.Record) error {
if e.injectErr != nil {
if err := e.injectErr(r); err != nil {
return err
}
}
records = append(records, r)
return nil
}); err != nil {
return err
}
e.records = records
return nil
}
func (e *testExporter) resetRecords() ([]export.Record, int) {
e.lock.Lock()
defer e.lock.Unlock()
r := e.records
e.records = nil
return r, e.exports
} }
func TestPushDoubleStop(t *testing.T) { func TestPushDoubleStop(t *testing.T) {
fix := newFixture(t) exporter := newExporter()
p := push.New(processortest.AggregatorSelector(), fix.exporter) checkpointer := newCheckpointer()
p := push.New(checkpointer, exporter)
p.Start() p.Start()
p.Stop() p.Stop()
p.Stop() p.Stop()
} }
func TestPushDoubleStart(t *testing.T) { func TestPushDoubleStart(t *testing.T) {
fix := newFixture(t) exporter := newExporter()
p := push.New(processortest.AggregatorSelector(), fix.exporter) checkpointer := newCheckpointer()
p := push.New(checkpointer, exporter)
p.Start() p.Start()
p.Start() p.Start()
p.Stop() p.Stop()
} }
func TestPushTicker(t *testing.T) { func TestPushTicker(t *testing.T) {
fix := newFixture(t) exporter := newExporter()
checkpointer := newCheckpointer()
p := push.New( p := push.New(
processortest.AggregatorSelector(), checkpointer,
fix.exporter, exporter,
push.WithPeriod(time.Second), push.WithPeriod(time.Second),
push.WithResource(testResource), push.WithResource(testResource),
) )
@ -160,39 +121,29 @@ func TestPushTicker(t *testing.T) {
counter.Add(ctx, 3) counter.Add(ctx, 3)
records, exports := fix.exporter.resetRecords() require.EqualValues(t, map[string]float64{}, exporter.Values())
require.Equal(t, 0, exports)
require.Equal(t, 0, len(records))
mock.Add(time.Second) mock.Add(time.Second)
runtime.Gosched() runtime.Gosched()
records, exports = fix.exporter.resetRecords() require.EqualValues(t, map[string]float64{
require.Equal(t, 1, exports) "counter.sum//R=V": 3,
require.Equal(t, 1, len(records)) }, exporter.Values())
require.Equal(t, "counter.sum", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
sum, err := records[0].Aggregation().(aggregation.Sum).Sum() require.Equal(t, 1, exporter.ExportCount())
require.Equal(t, int64(3), sum.AsInt64()) exporter.Reset()
require.Nil(t, err)
fix.checkpointSet.Reset()
counter.Add(ctx, 7) counter.Add(ctx, 7)
mock.Add(time.Second) mock.Add(time.Second)
runtime.Gosched() runtime.Gosched()
records, exports = fix.exporter.resetRecords() require.EqualValues(t, map[string]float64{
require.Equal(t, 2, exports) "counter.sum//R=V": 10,
require.Equal(t, 1, len(records)) }, exporter.Values())
require.Equal(t, "counter.sum", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
sum, err = records[0].Aggregation().(aggregation.Sum).Sum() require.Equal(t, 1, exporter.ExportCount())
require.Equal(t, int64(7), sum.AsInt64()) exporter.Reset()
require.Nil(t, err)
p.Stop() p.Stop()
} }
@ -208,23 +159,32 @@ func TestPushExportError(t *testing.T) {
} }
var errAggregator = fmt.Errorf("unexpected error") var errAggregator = fmt.Errorf("unexpected error")
var tests = []struct { var tests = []struct {
name string name string
injectedError error injectedError error
expectedDescriptors []string expected map[string]float64
expectedError error expectedError error
}{ }{
{"errNone", nil, []string{"counter1.sum{R=V,X=Y}", "counter2.sum{R=V,}"}, nil}, {"errNone", nil, map[string]float64{
{"errNoData", aggregation.ErrNoData, []string{"counter2.sum{R=V,}"}, nil}, "counter1.sum/X=Y/R=V": 3,
{"errUnexpected", errAggregator, []string{}, errAggregator}, "counter2.sum//R=V": 5,
}, nil},
{"errNoData", aggregation.ErrNoData, map[string]float64{
"counter2.sum//R=V": 5,
}, nil},
{"errUnexpected", errAggregator, map[string]float64{}, errAggregator},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
fix := newFixture(t) exporter := newExporter()
fix.exporter.injectErr = injector("counter1.sum", tt.injectedError) exporter.InjectErr = injector("counter1.sum", tt.injectedError)
// This test validates the error handling
// behavior of the basic Processor is honored
// by the push processor.
checkpointer := basic.New(processorTest.AggregatorSelector(), exporter)
p := push.New( p := push.New(
processortest.AggregatorSelector(), checkpointer,
fix.exporter, exporter,
push.WithPeriod(time.Second), push.WithPeriod(time.Second),
push.WithResource(testResource), push.WithResource(testResource),
) )
@ -244,31 +204,21 @@ func TestPushExportError(t *testing.T) {
counter1.Add(ctx, 3, kv.String("X", "Y")) counter1.Add(ctx, 3, kv.String("X", "Y"))
counter2.Add(ctx, 5) counter2.Add(ctx, 5)
require.Equal(t, 0, fix.exporter.exports) require.Equal(t, 0, exporter.ExportCount())
require.Nil(t, testHandler.Flush()) require.Nil(t, testHandler.Flush())
mock.Add(time.Second) mock.Add(time.Second)
runtime.Gosched() runtime.Gosched()
records, exports := fix.exporter.resetRecords() require.Equal(t, 1, exporter.ExportCount())
require.Equal(t, 1, exports)
if tt.expectedError == nil { if tt.expectedError == nil {
require.EqualValues(t, tt.expected, exporter.Values())
require.NoError(t, testHandler.Flush()) require.NoError(t, testHandler.Flush())
} else { } else {
err := testHandler.Flush() err := testHandler.Flush()
require.Error(t, err) require.Error(t, err)
require.Equal(t, tt.expectedError, err) require.Equal(t, tt.expectedError, err)
} }
require.Equal(t, len(tt.expectedDescriptors), len(records))
for _, r := range records {
require.Contains(t, tt.expectedDescriptors,
fmt.Sprintf("%s{%s,%s}",
r.Descriptor().Name(),
r.Resource().Encoded(label.DefaultEncoder()),
r.Labels().Encoded(label.DefaultEncoder()),
),
)
}
p.Stop() p.Stop()
}) })

View File

@ -69,6 +69,9 @@ func init() {
global.SetErrorHandler(testHandler) global.SetErrorHandler(testHandler)
} }
// correctnessProcessor could be replaced with processortest.Processor
// with a non-default aggregator selector. TODO(#872) use the
// processortest code here.
type correctnessProcessor struct { type correctnessProcessor struct {
t *testing.T t *testing.T
*testSelector *testSelector

View File

@ -117,11 +117,12 @@ type (
) )
var _ export.Processor = &Processor{} var _ export.Processor = &Processor{}
var _ export.Checkpointer = &Processor{}
var _ export.CheckpointSet = &state{} var _ export.CheckpointSet = &state{}
var ErrInconsistentState = fmt.Errorf("inconsistent processor state") var ErrInconsistentState = fmt.Errorf("inconsistent processor state")
var ErrInvalidExporterKind = fmt.Errorf("invalid exporter kind") var ErrInvalidExporterKind = fmt.Errorf("invalid exporter kind")
// New returns a basic Processor using the provided // New returns a basic Processor that is also a Checkpointer using the provided
// AggregatorSelector to select Aggregators. The ExportKindSelector // AggregatorSelector to select Aggregators. The ExportKindSelector
// is consulted to determine the kind(s) of exporter that will consume // is consulted to determine the kind(s) of exporter that will consume
// data, so that this Processor can prepare to compute Delta or // data, so that this Processor can prepare to compute Delta or

View File

@ -15,8 +15,10 @@
package processortest package processortest
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/label"
@ -29,33 +31,128 @@ import (
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
) )
type ( type (
nameWithNumKind struct { // mapKey is the unique key for a metric, consisting of its
name string // unique descriptor, distinct labels, and distinct resource
numberKind metric.NumberKind // attributes.
mapKey struct {
desc *metric.Descriptor
labels label.Distinct
resource label.Distinct
} }
// Output collects distinct metric/label set outputs. // mapValue is value stored in a processor used to produce a
// CheckpointSet.
mapValue struct {
labels *label.Set
resource *resource.Resource
aggregator export.Aggregator
}
// Output implements export.CheckpointSet.
Output struct { Output struct {
m map[nameWithNumKind]export.Aggregator m map[mapKey]mapValue
labelEncoder label.Encoder labelEncoder label.Encoder
sync.RWMutex
} }
// testAggregatorSelector returns aggregators consistent with // testAggregatorSelector returns aggregators consistent with
// the test variables below, needed for testing stateful // the test variables below, needed for testing stateful
// processors, which clone Aggregators using AggregatorFor(desc). // processors, which clone Aggregators using AggregatorFor(desc).
testAggregatorSelector struct{} testAggregatorSelector struct{}
// testCheckpointer is a export.Checkpointer.
testCheckpointer struct {
started int
finished int
*Processor
}
// Processor is a testing implementation of export.Processor that
// assembles its results as a map[string]float64.
Processor struct {
export.AggregatorSelector
output *Output
}
// Exporter is a testing implementation of export.Exporter that
// assembles its results as a map[string]float64.
Exporter struct {
export.ExportKindSelector
output *Output
exportCount int
// InjectErr supports returning conditional errors from
// the Export() routine. This must be set before the
// Exporter is first used.
InjectErr func(export.Record) error
}
) )
func NewOutput(labelEncoder label.Encoder) Output { // NewProcessor returns a new testing Processor implementation.
return Output{ // Verify expected outputs using Values(), e.g.:
m: make(map[nameWithNumKind]export.Aggregator), //
labelEncoder: labelEncoder, // require.EqualValues(t, map[string]float64{
// "counter.sum/A=1,B=2/R=V": 100,
// }, processor.Values())
//
// Where in the example A=1,B=2 is the encoded labels and R=V is the
// encoded resource value.
func NewProcessor(selector export.AggregatorSelector, encoder label.Encoder) *Processor {
return &Processor{
AggregatorSelector: selector,
output: NewOutput(encoder),
} }
} }
// Process implements export.Processor.
func (p *Processor) Process(accum export.Accumulation) error {
return p.output.AddAccumulation(accum)
}
// Values returns the mapping from label set to point values for the
// accumulations that were processed. Point values are chosen as
// either the Sum or the LastValue, whichever is implemented. (All
// the built-in Aggregators implement one of these interfaces.)
func (p *Processor) Values() map[string]float64 {
return p.output.Map()
}
// Checkpointer returns a checkpointer that computes a single
// interval.
func Checkpointer(p *Processor) export.Checkpointer {
return &testCheckpointer{
Processor: p,
}
}
// StartCollection implements export.Checkpointer.
func (c *testCheckpointer) StartCollection() {
if c.started != c.finished {
panic(fmt.Sprintf("collection was already started: %d != %d", c.started, c.finished))
}
c.started++
}
// FinishCollection implements export.Checkpointer.
func (c *testCheckpointer) FinishCollection() error {
if c.started-1 != c.finished {
return fmt.Errorf("collection was not started: %d != %d", c.started, c.finished)
}
c.finished++
return nil
}
// CheckpointSet implements export.Checkpointer.
func (c *testCheckpointer) CheckpointSet() export.CheckpointSet {
return c.Processor.output
}
// AggregatorSelector returns a policy that is consistent with the // AggregatorSelector returns a policy that is consistent with the
// test descriptors above. I.e., it returns sum.New() for counter // test descriptors above. I.e., it returns sum.New() for counter
// instruments and lastvalue.New() for lastValue instruments. // instruments and lastvalue.New() for lastValue instruments.
@ -63,6 +160,7 @@ func AggregatorSelector() export.AggregatorSelector {
return testAggregatorSelector{} return testAggregatorSelector{}
} }
// AggregatorFor implements export.AggregatorSelector.
func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) { func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch { switch {
@ -105,49 +203,98 @@ func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...
} }
} }
// NewOutput is a helper for testing an expected set of Accumulations
// (from an Accumulator) or an expected set of Records (from a
// Processor). If testing with an Accumulator, it may be simpler to
// use the test Processor in this package.
func NewOutput(labelEncoder label.Encoder) *Output {
return &Output{
m: make(map[mapKey]mapValue),
labelEncoder: labelEncoder,
}
}
// ForEach implements export.CheckpointSet.
func (o *Output) ForEach(_ export.ExportKindSelector, ff func(export.Record) error) error {
for key, value := range o.m {
if err := ff(export.NewRecord(
key.desc,
value.labels,
value.resource,
value.aggregator.Aggregation(),
time.Time{},
time.Time{},
)); err != nil {
return err
}
}
return nil
}
// AddRecord adds a string representation of the exported metric data // AddRecord adds a string representation of the exported metric data
// to a map for use in testing. The value taken from the record is // to a map for use in testing. The value taken from the record is
// either the Sum() or the LastValue() of its Aggregation(), whichever // either the Sum() or the LastValue() of its Aggregation(), whichever
// is defined. Record timestamps are ignored. // is defined. Record timestamps are ignored.
func (o Output) AddRecord(rec export.Record) error { func (o *Output) AddRecord(rec export.Record) error {
encoded := rec.Labels().Encoded(o.labelEncoder) key := mapKey{
rencoded := rec.Resource().Encoded(o.labelEncoder) desc: rec.Descriptor(),
key := nameWithNumKind{ labels: rec.Labels().Equivalent(),
name: fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded), resource: rec.Resource().Equivalent(),
numberKind: rec.Descriptor().NumberKind(),
} }
if _, ok := o.m[key]; !ok { if _, ok := o.m[key]; !ok {
var agg export.Aggregator var agg export.Aggregator
testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg) testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg)
o.m[key] = agg o.m[key] = mapValue{
aggregator: agg,
labels: rec.Labels(),
resource: rec.Resource(),
}
} }
return o.m[key].Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor()) return o.m[key].aggregator.Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor())
} }
func (o Output) Map() map[string]float64 { // Map returns the calculated values for test validation from a set of
// Accumulations or a set of Records. When mapping records or
// accumulations into floating point values, the Sum() or LastValue()
// is chosen, whichever is implemented by the underlying Aggregator.
func (o *Output) Map() map[string]float64 {
r := make(map[string]float64) r := make(map[string]float64)
for nnk, agg := range o.m { err := o.ForEach(export.PassThroughExporter, func(record export.Record) error {
value := 0.0 for key, value := range o.m {
if s, ok := agg.(aggregation.Sum); ok { encoded := value.labels.Encoded(o.labelEncoder)
sum, _ := s.Sum() rencoded := value.resource.Encoded(o.labelEncoder)
value = sum.CoerceToFloat64(nnk.numberKind) number := 0.0
} else if l, ok := agg.(aggregation.LastValue); ok { if s, ok := value.aggregator.(aggregation.Sum); ok {
last, _, _ := l.LastValue() sum, _ := s.Sum()
value = last.CoerceToFloat64(nnk.numberKind) number = sum.CoerceToFloat64(key.desc.NumberKind())
} else { } else if l, ok := value.aggregator.(aggregation.LastValue); ok {
panic(fmt.Sprintf("Unhandled aggregator type: %T", agg)) last, _, _ := l.LastValue()
number = last.CoerceToFloat64(key.desc.NumberKind())
} else {
panic(fmt.Sprintf("Unhandled aggregator type: %T", value.aggregator))
}
name := fmt.Sprint(key.desc.Name(), "/", encoded, "/", rencoded)
r[name] = number
} }
r[nnk.name] = value return nil
})
if err != nil {
panic(fmt.Sprint("Unexpected processor error: ", err))
} }
return r return r
} }
// Reset restores the Output to its initial state, with no accumulated
// metric data.
func (o *Output) Reset() {
o.m = map[mapKey]mapValue{}
}
// AddAccumulation adds a string representation of the exported metric // AddAccumulation adds a string representation of the exported metric
// data to a map for use in testing. The value taken from the // data to a map for use in testing. The value taken from the
// accumulation is either the Sum() or the LastValue() of its // accumulation is either the Sum() or the LastValue() of its
// Aggregator().Aggregation(), whichever is defined. // Aggregator().Aggregation(), whichever is defined.
func (o Output) AddAccumulation(acc export.Accumulation) error { func (o *Output) AddAccumulation(acc export.Accumulation) error {
return o.AddRecord( return o.AddRecord(
export.NewRecord( export.NewRecord(
acc.Descriptor(), acc.Descriptor(),
@ -159,3 +306,60 @@ func (o Output) AddAccumulation(acc export.Accumulation) error {
), ),
) )
} }
// NewExporter returns a new testing Exporter implementation.
// Verify exporter outputs using Values(), e.g.,:
//
// require.EqualValues(t, map[string]float64{
// "counter.sum/A=1,B=2/R=V": 100,
// }, exporter.Values())
//
// Where in the example A=1,B=2 is the encoded labels and R=V is the
// encoded resource value.
func NewExporter(selector export.ExportKindSelector, encoder label.Encoder) *Exporter {
return &Exporter{
ExportKindSelector: selector,
output: NewOutput(encoder),
}
}
func (e *Exporter) Export(_ context.Context, ckpt export.CheckpointSet) error {
e.output.Lock()
defer e.output.Unlock()
e.exportCount++
return ckpt.ForEach(e.ExportKindSelector, func(r export.Record) error {
if e.InjectErr != nil {
if err := e.InjectErr(r); err != nil {
return err
}
}
return e.output.AddRecord(r)
})
}
// Values returns the mapping from label set to point values for the
// accumulations that were processed. Point values are chosen as
// either the Sum or the LastValue, whichever is implemented. (All
// the built-in Aggregators implement one of these interfaces.)
func (e *Exporter) Values() map[string]float64 {
e.output.Lock()
defer e.output.Unlock()
return e.output.Map()
}
// ExportCount returns the number of times Export() has been called
// since the last Reset().
func (e *Exporter) ExportCount() int {
e.output.Lock()
defer e.output.Unlock()
return e.exportCount
}
// Reset sets the exporter's output to the initial, empty state and
// resets the export count to zero.
func (e *Exporter) Reset() {
e.output.Lock()
defer e.output.Unlock()
e.output.Reset()
e.exportCount = 0
}

View File

@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processortest_test
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
"go.opentelemetry.io/otel/sdk/resource"
)
func generateTestData(proc export.Processor) {
ctx := context.Background()
accum := metricsdk.NewAccumulator(
proc,
metricsdk.WithResource(
resource.New(kv.String("R", "V")),
),
)
meter := metric.WrapMeterImpl(accum, "testing")
counter := metric.Must(meter).NewFloat64Counter("counter.sum")
_ = metric.Must(meter).NewInt64SumObserver("observer.sum",
func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(10, kv.String("K1", "V1"))
result.Observe(11, kv.String("K1", "V2"))
},
)
counter.Add(ctx, 100, kv.String("K1", "V1"))
counter.Add(ctx, 101, kv.String("K1", "V2"))
accum.Collect(ctx)
}
func TestProcessorTesting(t *testing.T) {
// Test the Processor test helper using a real Accumulator to
// generate Accumulations.
testProc := processorTest.NewProcessor(
processorTest.AggregatorSelector(),
label.DefaultEncoder(),
)
checkpointer := processorTest.Checkpointer(testProc)
generateTestData(checkpointer)
expect := map[string]float64{
"counter.sum/K1=V1/R=V": 100,
"counter.sum/K1=V2/R=V": 101,
"observer.sum/K1=V1/R=V": 10,
"observer.sum/K1=V2/R=V": 11,
}
// Validate the processor's checkpoint directly.
require.EqualValues(t, expect, testProc.Values())
// Export the data and validate it again.
exporter := processorTest.NewExporter(
export.PassThroughExporter,
label.DefaultEncoder(),
)
err := exporter.Export(context.Background(), checkpointer.CheckpointSet())
require.NoError(t, err)
require.EqualValues(t, expect, exporter.Values())
}