diff --git a/example/prometheus/main.go b/example/prometheus/main.go index 4fbf94baa..378eb98e6 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -33,11 +33,11 @@ var ( ) func initMeter() *push.Controller { - pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Config{}) + pusher, exporter, err := prometheus.InstallNewPipeline(prometheus.Config{}) if err != nil { log.Panicf("failed to initialize prometheus exporter %v", err) } - http.HandleFunc("/", hf) + http.HandleFunc("/", exporter.ServeHTTP) go func() { _ = http.ListenAndServe(":2222", nil) }() diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 86bedf02a..44f7fb016 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -18,7 +18,7 @@ import ( "context" "fmt" "net/http" - "time" + "sync" "go.opentelemetry.io/otel/api/metric" @@ -30,7 +30,6 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/controller/push" - integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) @@ -42,6 +41,7 @@ type Exporter struct { registerer prometheus.Registerer gatherer prometheus.Gatherer + lock sync.RWMutex snapshot export.CheckpointSet onError func(error) @@ -134,41 +134,49 @@ func NewRawExporter(config Config) (*Exporter, error) { // http.HandleFunc("/metrics", hf) // defer pipeline.Stop() // ... Done -func InstallNewPipeline(config Config) (*push.Controller, http.HandlerFunc, error) { - controller, hf, err := NewExportPipeline(config, time.Minute) +func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) { + controller, exp, err := NewExportPipeline(config, options...) if err != nil { - return controller, hf, err + return controller, exp, err } global.SetMeterProvider(controller.Provider()) - return controller, hf, err + return controller, exp, err } // NewExportPipeline sets up a complete export pipeline with the recommended setup, // chaining a NewRawExporter into the recommended selectors and integrators. -func NewExportPipeline(config Config, period time.Duration) (*push.Controller, http.HandlerFunc, error) { - selector := simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries) +// +// The returned Controller contains an implementation of +// `metric.Provider`. The controller is returned unstarted and should +// be started by the caller to begin collection. +func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) { exporter, err := NewRawExporter(config) if err != nil { return nil, nil, err } - // Prometheus needs to use a stateful integrator since counters (and histogram since they are a collection of Counters) - // are cumulative (i.e., monotonically increasing values) and should not be resetted after each export. + // Prometheus uses a stateful push controller since instruments are + // cumulative and should not be reset after each collection interval. // // Prometheus uses this approach to be resilient to scrape failures. // If a Prometheus server tries to scrape metrics from a host and fails for some reason, // it could try again on the next scrape and no data would be lost, only resolution. // // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. - integrator := integrator.New(selector, true) - pusher := push.New(integrator, exporter, period) - pusher.Start() + pusher := push.New( + simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries), + exporter, + append(options, push.WithStateful(true))..., + ) - return pusher, exporter.ServeHTTP, nil + return pusher, exporter, nil } // Export exports the provide metric record to prometheus. func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { + // TODO: Use the resource value in this exporter. + e.lock.Lock() + defer e.lock.Unlock() e.snapshot = checkpointSet return nil } @@ -187,10 +195,16 @@ func newCollector(exporter *Exporter) *collector { } func (c *collector) Describe(ch chan<- *prometheus.Desc) { + c.exp.lock.RLock() + defer c.exp.lock.RUnlock() + if c.exp.snapshot == nil { return } + c.exp.snapshot.RLock() + defer c.exp.snapshot.RUnlock() + _ = c.exp.snapshot.ForEach(func(record export.Record) error { ch <- c.toDesc(&record) return nil @@ -202,10 +216,16 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) { // Collect is invoked whenever prometheus.Gatherer is also invoked. // For example, when the HTTP endpoint is invoked by Prometheus. func (c *collector) Collect(ch chan<- prometheus.Metric) { + c.exp.lock.RLock() + defer c.exp.lock.RUnlock() + if c.exp.snapshot == nil { return } + c.exp.snapshot.RLock() + defer c.exp.snapshot.RUnlock() + err := c.exp.snapshot.ForEach(func(record export.Record) error { agg := record.Aggregator() numberKind := record.Descriptor().NumberKind() diff --git a/exporters/metric/prometheus/prometheus_test.go b/exporters/metric/prometheus/prometheus_test.go index a95d09ad3..254cd1e55 100644 --- a/exporters/metric/prometheus/prometheus_test.go +++ b/exporters/metric/prometheus/prometheus_test.go @@ -15,31 +15,35 @@ package prometheus_test import ( + "bytes" "context" - "log" + "io/ioutil" + "net/http" "net/http/httptest" + "runtime" "sort" "strings" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/prometheus" - "go.opentelemetry.io/otel/exporters/metric/test" + exportTest "go.opentelemetry.io/otel/exporters/metric/test" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test" ) func TestPrometheusExporter(t *testing.T) { exporter, err := prometheus.NewRawExporter(prometheus.Config{ DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99}, }) - if err != nil { - log.Panicf("failed to initialize prometheus exporter %v", err) - } + require.NoError(t, err) var expected []string - checkpointSet := test.NewCheckpointSet(nil) + checkpointSet := exportTest.NewCheckpointSet(nil) counter := metric.NewDescriptor( "counter", metric.CounterKind, metric.Float64NumberKind) @@ -116,7 +120,7 @@ func TestPrometheusExporter(t *testing.T) { compareExport(t, exporter, checkpointSet, expected) } -func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) { +func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *exportTest.CheckpointSet, expected []string) { err := exporter.Export(context.Background(), checkpointSet) require.Nil(t, err) @@ -138,3 +142,58 @@ func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *t require.Equal(t, strings.Join(expected, "\n"), strings.Join(metricsOnly, "\n")) } + +func TestPrometheusStatefulness(t *testing.T) { + // Create a meter + controller, exporter, err := prometheus.NewExportPipeline(prometheus.Config{}, push.WithPeriod(time.Minute)) + require.NoError(t, err) + + meter := controller.Provider().Meter("test") + mock := controllerTest.NewMockClock() + controller.SetClock(mock) + controller.Start() + + // GET the HTTP endpoint + scrape := func() string { + var input bytes.Buffer + resp := httptest.NewRecorder() + req, err := http.NewRequest("GET", "/", &input) + require.NoError(t, err) + + exporter.ServeHTTP(resp, req) + data, err := ioutil.ReadAll(resp.Result().Body) + require.NoError(t, err) + + return string(data) + } + + ctx := context.Background() + + counter := metric.Must(meter).NewInt64Counter( + "a.counter", + metric.WithDescription("Counts things"), + ) + + counter.Add(ctx, 100, kv.String("key", "value")) + + // Trigger a push + mock.Add(time.Minute) + runtime.Gosched() + + require.Equal(t, `# HELP a_counter Counts things +# TYPE a_counter counter +a_counter{key="value"} 100 +`, scrape()) + + counter.Add(ctx, 100, kv.String("key", "value")) + + // Again, now expect cumulative count + mock.Add(time.Minute) + runtime.Gosched() + + require.Equal(t, `# HELP a_counter Counts things +# TYPE a_counter counter +a_counter{key="value"} 200 +`, scrape()) + +} diff --git a/exporters/metric/stdout/example_test.go b/exporters/metric/stdout/example_test.go index 0952b41c9..306a9f501 100644 --- a/exporters/metric/stdout/example_test.go +++ b/exporters/metric/stdout/example_test.go @@ -17,7 +17,6 @@ package stdout_test import ( "context" "log" - "time" "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/metric" @@ -29,7 +28,7 @@ func ExampleNewExportPipeline() { pusher, err := stdout.NewExportPipeline(stdout.Config{ PrettyPrint: true, DoNotPrintTime: true, - }, time.Minute) + }) if err != nil { log.Fatal("Could not initialize stdout exporter:", err) } diff --git a/exporters/metric/stdout/stdout.go b/exporters/metric/stdout/stdout.go index 433288503..270edaa02 100644 --- a/exporters/metric/stdout/stdout.go +++ b/exporters/metric/stdout/stdout.go @@ -29,7 +29,6 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/controller/push" - integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) @@ -120,8 +119,8 @@ func NewRawExporter(config Config) (*Exporter, error) { // } // defer pipeline.Stop() // ... Done -func InstallNewPipeline(config Config, opts ...push.Option) (*push.Controller, error) { - controller, err := NewExportPipeline(config, time.Minute, opts...) +func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller, error) { + controller, err := NewExportPipeline(config, options...) if err != nil { return controller, err } @@ -129,16 +128,22 @@ func InstallNewPipeline(config Config, opts ...push.Option) (*push.Controller, e return controller, err } -// NewExportPipeline sets up a complete export pipeline with the recommended setup, -// chaining a NewRawExporter into the recommended selectors and integrators. -func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) (*push.Controller, error) { - selector := simple.NewWithExactDistribution() +// NewExportPipeline sets up a complete export pipeline with the +// recommended setup, chaining a NewRawExporter into the recommended +// selectors and integrators. +// +// The pipeline is configured with a stateful integrator unless the +// push.WithStateful(false) option is used. +func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) { exporter, err := NewRawExporter(config) if err != nil { return nil, err } - integrator := integrator.New(selector, true) - pusher := push.New(integrator, exporter, period, opts...) + pusher := push.New( + simple.NewWithExactDistribution(), + exporter, + append([]push.Option{push.WithStateful(true)}, options...)..., + ) pusher.Start() return pusher, nil diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index cb99b6489..9fd639493 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -17,6 +17,7 @@ package test import ( "context" "errors" + "sync" "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/label" @@ -36,9 +37,10 @@ type mapkey struct { } type CheckpointSet struct { + sync.RWMutex records map[mapkey]export.Record - resource *resource.Resource updates []export.Record + resource *resource.Resource } // NewCheckpointSet returns a test CheckpointSet that new records could be added. diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index a7a764a49..e4f61fdb6 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -111,7 +111,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) selector := simple.NewWithExactDistribution() integrator := integrator.New(selector, true) - pusher := push.New(integrator, exp, 60*time.Second) + pusher := push.New(integrator, exp) pusher.Start() ctx := context.Background() diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index db47cb157..390a59995 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -16,6 +16,7 @@ package otlp import ( "context" + "sync" "testing" colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1" @@ -60,10 +61,11 @@ func (m *metricsServiceClientStub) Reset() { } type checkpointSet struct { + sync.RWMutex records []metricsdk.Record } -func (m checkpointSet) ForEach(fn func(metricsdk.Record) error) error { +func (m *checkpointSet) ForEach(fn func(metricsdk.Record) error) error { for _, r := range m.records { if err := fn(r); err != nil && err != aggregator.ErrNoData { return err @@ -662,7 +664,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg)) } for _, records := range recs { - assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records})) + assert.NoError(t, exp.Export(context.Background(), &checkpointSet{records: records})) } // assert.ElementsMatch does not equate nested slices of different order, @@ -726,7 +728,7 @@ func TestEmptyMetricExport(t *testing.T) { }, } { msc.Reset() - require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records})) + require.NoError(t, exp.Export(context.Background(), &checkpointSet{records: test.records})) assert.Equal(t, test.want, msc.ResourceMetrics()) } } diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 86f195aa1..f22cdec0f 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/export/metric" import ( "context" + "sync" "go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/metric" @@ -39,10 +40,6 @@ import ( // single-threaded context from the SDK, after the aggregator is // checkpointed, allowing the integrator to build the set of metrics // currently being exported. -// -// The `CheckpointSet` method is called during collection in a -// single-threaded context from the Exporter, giving the exporter -// access to a producer for iterating over the complete checkpoint. type Integrator interface { // AggregationSelector is responsible for selecting the // concrete type of Aggregator used for a metric in the SDK. @@ -70,17 +67,6 @@ type Integrator interface { // The Context argument originates from the controller that // orchestrates collection. Process(ctx context.Context, record Record) error - - // CheckpointSet is the interface used by the controller to - // access the fully aggregated checkpoint after collection. - // - // The returned CheckpointSet is passed to the Exporter. - CheckpointSet() CheckpointSet - - // FinishedCollection informs the Integrator that a complete - // collection round was completed. Stateless integrators might - // reset state in this method, for example. - FinishedCollection() } // AggregationSelector supports selecting the kind of Aggregator to @@ -173,6 +159,19 @@ type CheckpointSet interface { // of error will immediately halt ForEach and return // the error to the caller. ForEach(func(Record) error) error + + // Locker supports locking the checkpoint set. Collection + // into the checkpoint set cannot take place (in case of a + // stateful integrator) while it is locked. + // + // The Integrator attached to the Accumulator MUST be called + // with the lock held. + sync.Locker + + // RLock acquires a read lock corresponding to this Locker. + RLock() + // RUnlock releases a read lock corresponding to this Locker. + RUnlock() } // Record contains the exported data for a single metric instrument diff --git a/sdk/metric/controller/push/config.go b/sdk/metric/controller/push/config.go index 2b2b86b71..fccd02df0 100644 --- a/sdk/metric/controller/push/config.go +++ b/sdk/metric/controller/push/config.go @@ -15,6 +15,8 @@ package push import ( + "time" + sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" ) @@ -30,6 +32,14 @@ type Config struct { // Resource is the OpenTelemetry resource associated with all Meters // created by the Controller. Resource *resource.Resource + + // Stateful causes the controller to maintain state across + // collection events, so that records in the exported + // checkpoint set are cumulative. + Stateful bool + + // Period is the interval between calls to Collect a checkpoint. + Period time.Duration } // Option is the interface that applies the value to a configuration option. @@ -59,3 +69,25 @@ type resourceOption struct{ *resource.Resource } func (o resourceOption) Apply(config *Config) { config.Resource = o.Resource } + +// WithStateful sets the Stateful configuration option of a Config. +func WithStateful(stateful bool) Option { + return statefulOption(stateful) +} + +type statefulOption bool + +func (o statefulOption) Apply(config *Config) { + config.Stateful = bool(o) +} + +// WithPeriod sets the Period configuration option of a Config. +func WithPeriod(period time.Duration) Option { + return periodOption(period) +} + +type periodOption time.Duration + +func (o periodOption) Apply(config *Config) { + config.Period = time.Duration(o) +} diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index ea347d3fa..0a009780b 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -23,77 +23,61 @@ import ( "go.opentelemetry.io/otel/api/metric/registry" export "go.opentelemetry.io/otel/sdk/export/metric" sdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" + controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" + "go.opentelemetry.io/otel/sdk/metric/integrator/simple" ) +// DefaultPushPeriod is the default time interval between pushes. +const DefaultPushPeriod = 10 * time.Second + // Controller organizes a periodic push of metric data. type Controller struct { lock sync.Mutex - collectLock sync.Mutex accumulator *sdk.Accumulator provider *registry.Provider errorHandler sdk.ErrorHandler - integrator export.Integrator + integrator *simple.Integrator exporter export.Exporter wg sync.WaitGroup ch chan struct{} period time.Duration - ticker Ticker - clock Clock + clock controllerTime.Clock + ticker controllerTime.Ticker } -// Several types below are created to match "github.com/benbjohnson/clock" -// so that it remains a test-only dependency. - -type Clock interface { - Now() time.Time - Ticker(time.Duration) Ticker -} - -type Ticker interface { - Stop() - C() <-chan time.Time -} - -type realClock struct { -} - -type realTicker struct { - ticker *time.Ticker -} - -var _ Clock = realClock{} -var _ Ticker = realTicker{} - // New constructs a Controller, an implementation of metric.Provider, -// using the provided integrator, exporter, collection period, and SDK -// configuration options to configure an SDK with periodic collection. -// The integrator itself is configured with the aggregation selector policy. -func New(integrator export.Integrator, exporter export.Exporter, period time.Duration, opts ...Option) *Controller { +// using the provided exporter and options to configure an SDK with +// periodic collection. +func New(selector export.AggregationSelector, exporter export.Exporter, opts ...Option) *Controller { c := &Config{ ErrorHandler: sdk.DefaultErrorHandler, - Resource: resource.Empty(), + Period: DefaultPushPeriod, } for _, opt := range opts { opt.Apply(c) } - impl := sdk.NewAccumulator(integrator, sdk.WithErrorHandler(c.ErrorHandler), sdk.WithResource(c.Resource)) + integrator := simple.New(selector, c.Stateful) + impl := sdk.NewAccumulator( + integrator, + sdk.WithErrorHandler(c.ErrorHandler), + sdk.WithResource(c.Resource), + ) return &Controller{ - accumulator: impl, provider: registry.NewProvider(impl), - errorHandler: c.ErrorHandler, + accumulator: impl, integrator: integrator, exporter: exporter, + errorHandler: c.ErrorHandler, ch: make(chan struct{}), - period: period, - clock: realClock{}, + period: c.Period, + clock: controllerTime.RealClock{}, } } // SetClock supports setting a mock clock for testing. This must be // called before Start(). -func (c *Controller) SetClock(clock Clock) { +func (c *Controller) SetClock(clock controllerTime.Clock) { c.lock.Lock() defer c.lock.Unlock() c.clock = clock @@ -162,53 +146,15 @@ func (c *Controller) tick() { // TODO: either remove the context argument from Export() or // configure a timeout here? ctx := context.Background() - c.collect(ctx) - checkpointSet := syncCheckpointSet{ - mtx: &c.collectLock, - delegate: c.integrator.CheckpointSet(), - } - err := c.exporter.Export(ctx, checkpointSet) + c.integrator.Lock() + defer c.integrator.Unlock() + + c.accumulator.Collect(ctx) + + err := c.exporter.Export(ctx, c.integrator.CheckpointSet()) c.integrator.FinishedCollection() if err != nil { c.errorHandler(err) } } - -func (c *Controller) collect(ctx context.Context) { - c.collectLock.Lock() - defer c.collectLock.Unlock() - - c.accumulator.Collect(ctx) -} - -// syncCheckpointSet is a wrapper for a CheckpointSet to synchronize -// SDK's collection and reads of a CheckpointSet by an exporter. -type syncCheckpointSet struct { - mtx *sync.Mutex - delegate export.CheckpointSet -} - -var _ export.CheckpointSet = (*syncCheckpointSet)(nil) - -func (c syncCheckpointSet) ForEach(fn func(export.Record) error) error { - c.mtx.Lock() - defer c.mtx.Unlock() - return c.delegate.ForEach(fn) -} - -func (realClock) Now() time.Time { - return time.Now() -} - -func (realClock) Ticker(period time.Duration) Ticker { - return realTicker{time.NewTicker(period)} -} - -func (t realTicker) Stop() { - t.ticker.Stop() -} - -func (t realTicker) C() <-chan time.Time { - return t.ticker.C -} diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index dc4ae94c6..ad7945100 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -22,7 +22,6 @@ import ( "testing" "time" - "github.com/benbjohnson/clock" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/api/kv" @@ -33,17 +32,10 @@ import ( "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/controller/push" + controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test" "go.opentelemetry.io/otel/sdk/resource" ) -type testIntegrator struct { - t *testing.T - lock sync.Mutex - checkpointSet *test.CheckpointSet - checkpoints int - finishes int -} - var testResource = resource.New(kv.String("R", "V")) type testExporter struct { @@ -56,69 +48,27 @@ type testExporter struct { type testFixture struct { checkpointSet *test.CheckpointSet - integrator *testIntegrator exporter *testExporter } -type mockClock struct { - mock *clock.Mock -} - -type mockTicker struct { - ticker *clock.Ticker -} - -var _ push.Clock = mockClock{} -var _ push.Ticker = mockTicker{} +type testSelector struct{} func newFixture(t *testing.T) testFixture { checkpointSet := test.NewCheckpointSet(testResource) - integrator := &testIntegrator{ - t: t, - checkpointSet: checkpointSet, - } exporter := &testExporter{ t: t, } return testFixture{ checkpointSet: checkpointSet, - integrator: integrator, exporter: exporter, } } -func (b *testIntegrator) AggregatorFor(*metric.Descriptor) export.Aggregator { +func (testSelector) AggregatorFor(*metric.Descriptor) export.Aggregator { return sum.New() } -func (b *testIntegrator) CheckpointSet() export.CheckpointSet { - b.lock.Lock() - defer b.lock.Unlock() - b.checkpoints++ - return b.checkpointSet -} - -func (b *testIntegrator) FinishedCollection() { - b.lock.Lock() - defer b.lock.Unlock() - b.finishes++ -} - -func (b *testIntegrator) Process(_ context.Context, record export.Record) error { - b.lock.Lock() - defer b.lock.Unlock() - labels := record.Labels().ToSlice() - b.checkpointSet.Add(record.Descriptor(), record.Aggregator(), labels...) - return nil -} - -func (b *testIntegrator) getCounts() (checkpoints, finishes int) { - b.lock.Lock() - defer b.lock.Unlock() - return b.checkpoints, b.finishes -} - func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { e.lock.Lock() defer e.lock.Unlock() @@ -147,29 +97,9 @@ func (e *testExporter) resetRecords() ([]export.Record, int) { return r, e.exports } -func (c mockClock) Now() time.Time { - return c.mock.Now() -} - -func (c mockClock) Ticker(period time.Duration) push.Ticker { - return mockTicker{c.mock.Ticker(period)} -} - -func (c mockClock) Add(d time.Duration) { - c.mock.Add(d) -} - -func (t mockTicker) Stop() { - t.ticker.Stop() -} - -func (t mockTicker) C() <-chan time.Time { - return t.ticker.C -} - func TestPushDoubleStop(t *testing.T) { fix := newFixture(t) - p := push.New(fix.integrator, fix.exporter, time.Second) + p := push.New(testSelector{}, fix.exporter) p.Start() p.Stop() p.Stop() @@ -177,7 +107,7 @@ func TestPushDoubleStop(t *testing.T) { func TestPushDoubleStart(t *testing.T) { fix := newFixture(t) - p := push.New(fix.integrator, fix.exporter, time.Second) + p := push.New(testSelector{}, fix.exporter) p.Start() p.Start() p.Stop() @@ -186,10 +116,15 @@ func TestPushDoubleStart(t *testing.T) { func TestPushTicker(t *testing.T) { fix := newFixture(t) - p := push.New(fix.integrator, fix.exporter, time.Second) + p := push.New( + testSelector{}, + fix.exporter, + push.WithPeriod(time.Second), + push.WithResource(testResource), + ) meter := p.Provider().Meter("name") - mock := mockClock{clock.NewMock()} + mock := controllerTest.NewMockClock() p.SetClock(mock) ctx := context.Background() @@ -201,9 +136,6 @@ func TestPushTicker(t *testing.T) { counter.Add(ctx, 3) records, exports := fix.exporter.resetRecords() - checkpoints, finishes := fix.integrator.getCounts() - require.Equal(t, 0, checkpoints) - require.Equal(t, 0, finishes) require.Equal(t, 0, exports) require.Equal(t, 0, len(records)) @@ -211,9 +143,6 @@ func TestPushTicker(t *testing.T) { runtime.Gosched() records, exports = fix.exporter.resetRecords() - checkpoints, finishes = fix.integrator.getCounts() - require.Equal(t, 1, checkpoints) - require.Equal(t, 1, finishes) require.Equal(t, 1, exports) require.Equal(t, 1, len(records)) require.Equal(t, "counter", records[0].Descriptor().Name()) @@ -231,9 +160,6 @@ func TestPushTicker(t *testing.T) { runtime.Gosched() records, exports = fix.exporter.resetRecords() - checkpoints, finishes = fix.integrator.getCounts() - require.Equal(t, 2, checkpoints) - require.Equal(t, 2, finishes) require.Equal(t, 2, exports) require.Equal(t, 1, len(records)) require.Equal(t, "counter", records[0].Descriptor().Name()) @@ -271,7 +197,12 @@ func TestPushExportError(t *testing.T) { fix := newFixture(t) fix.exporter.injectErr = injector("counter1", tt.injectedError) - p := push.New(fix.integrator, fix.exporter, time.Second) + p := push.New( + testSelector{}, + fix.exporter, + push.WithPeriod(time.Second), + push.WithResource(testResource), + ) var err error var lock sync.Mutex @@ -281,7 +212,7 @@ func TestPushExportError(t *testing.T) { err = sdkErr }) - mock := mockClock{clock.NewMock()} + mock := controllerTest.NewMockClock() p.SetClock(mock) ctx := context.Background() @@ -303,10 +234,7 @@ func TestPushExportError(t *testing.T) { runtime.Gosched() records, exports := fix.exporter.resetRecords() - checkpoints, finishes := fix.integrator.getCounts() require.Equal(t, 1, exports) - require.Equal(t, 1, checkpoints) - require.Equal(t, 1, finishes) lock.Lock() if tt.expectedError == nil { require.NoError(t, err) diff --git a/sdk/metric/controller/test/test.go b/sdk/metric/controller/test/test.go new file mode 100644 index 000000000..f2c2e7447 --- /dev/null +++ b/sdk/metric/controller/test/test.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "time" + + "github.com/benbjohnson/clock" + + controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" +) + +type MockClock struct { + mock *clock.Mock +} + +type MockTicker struct { + ticker *clock.Ticker +} + +var _ controllerTime.Clock = MockClock{} +var _ controllerTime.Ticker = MockTicker{} + +func NewMockClock() MockClock { + return MockClock{clock.NewMock()} +} + +func (c MockClock) Now() time.Time { + return c.mock.Now() +} + +func (c MockClock) Ticker(period time.Duration) controllerTime.Ticker { + return MockTicker{c.mock.Ticker(period)} +} + +func (c MockClock) Add(d time.Duration) { + c.mock.Add(d) +} + +func (t MockTicker) Stop() { + t.ticker.Stop() +} + +func (t MockTicker) C() <-chan time.Time { + return t.ticker.C +} diff --git a/sdk/metric/controller/time/time.go b/sdk/metric/controller/time/time.go new file mode 100644 index 000000000..9d0e4eb79 --- /dev/null +++ b/sdk/metric/controller/time/time.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package time // import "go.opentelemetry.io/otel/sdk/metric/controller/time" + +import ( + "time" + lib "time" +) + +// Several types below are created to match "github.com/benbjohnson/clock" +// so that it remains a test-only dependency. + +type Clock interface { + Now() lib.Time + Ticker(lib.Duration) Ticker +} + +type Ticker interface { + Stop() + C() <-chan lib.Time +} + +type RealClock struct { +} + +type RealTicker struct { + ticker *lib.Ticker +} + +var _ Clock = RealClock{} +var _ Ticker = RealTicker{} + +func (RealClock) Now() time.Time { + return time.Now() +} + +func (RealClock) Ticker(period time.Duration) Ticker { + return RealTicker{time.NewTicker(period)} +} + +func (t RealTicker) Stop() { + t.ticker.Stop() +} + +func (t RealTicker) C() <-chan time.Time { + return t.ticker.C +} diff --git a/sdk/metric/example_test.go b/sdk/metric/example_test.go index c4910b9bb..d76766c7e 100644 --- a/sdk/metric/example_test.go +++ b/sdk/metric/example_test.go @@ -17,7 +17,6 @@ package metric_test import ( "context" "fmt" - "time" "go.opentelemetry.io/otel/api/kv" @@ -29,7 +28,7 @@ func ExampleNew() { pusher, err := stdout.NewExportPipeline(stdout.Config{ PrettyPrint: true, DoNotPrintTime: true, // This makes the output deterministic - }, time.Minute) + }) if err != nil { panic(fmt.Sprintln("Could not initialize stdout exporter:", err)) } diff --git a/sdk/metric/integrator/simple/simple.go b/sdk/metric/integrator/simple/simple.go index 123361ff0..28c35ac50 100644 --- a/sdk/metric/integrator/simple/simple.go +++ b/sdk/metric/integrator/simple/simple.go @@ -17,6 +17,7 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/integrator/simple" import ( "context" "errors" + "sync" "go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/metric" @@ -27,9 +28,9 @@ import ( type ( Integrator struct { - selector export.AggregationSelector - batchMap batchMap + export.AggregationSelector stateful bool + batch } batchKey struct { @@ -44,24 +45,26 @@ type ( resource *resource.Resource } - batchMap map[batchKey]batchValue + batch struct { + // RWMutex implements locking for the `CheckpoingSet` interface. + sync.RWMutex + values map[batchKey]batchValue + } ) var _ export.Integrator = &Integrator{} -var _ export.CheckpointSet = batchMap{} +var _ export.CheckpointSet = &batch{} func New(selector export.AggregationSelector, stateful bool) *Integrator { return &Integrator{ - selector: selector, - batchMap: batchMap{}, - stateful: stateful, + AggregationSelector: selector, + stateful: stateful, + batch: batch{ + values: map[batchKey]batchValue{}, + }, } } -func (b *Integrator) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { - return b.selector.AggregatorFor(descriptor) -} - func (b *Integrator) Process(_ context.Context, record export.Record) error { desc := record.Descriptor() key := batchKey{ @@ -70,7 +73,7 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error { resource: record.Resource().Equivalent(), } agg := record.Aggregator() - value, ok := b.batchMap[key] + value, ok := b.batch.values[key] if ok { // Note: The call to Merge here combines only // identical records. It is required even for a @@ -92,7 +95,7 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error { return err } } - b.batchMap[key] = batchValue{ + b.batch.values[key] = batchValue{ aggregator: agg, labels: record.Labels(), resource: record.Resource(), @@ -101,17 +104,17 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error { } func (b *Integrator) CheckpointSet() export.CheckpointSet { - return b.batchMap + return &b.batch } func (b *Integrator) FinishedCollection() { if !b.stateful { - b.batchMap = batchMap{} + b.batch.values = map[batchKey]batchValue{} } } -func (c batchMap) ForEach(f func(export.Record) error) error { - for key, value := range c { +func (b *batch) ForEach(f func(export.Record) error) error { + for key, value := range b.values { if err := f(export.NewRecord( key.descriptor, value.labels, diff --git a/sdk/metric/integrator/simple/simple_test.go b/sdk/metric/integrator/simple/simple_test.go index 2b43fc8a8..54fecdd7f 100644 --- a/sdk/metric/integrator/simple/simple_test.go +++ b/sdk/metric/integrator/simple/simple_test.go @@ -29,7 +29,7 @@ import ( // These tests use the ../test label encoding. -func TestUngroupedStateless(t *testing.T) { +func TestSimpleStateless(t *testing.T) { ctx := context.Background() b := simple.New(test.NewAggregationSelector(), false) @@ -60,7 +60,6 @@ func TestUngroupedStateless(t *testing.T) { _ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50)) checkpointSet := b.CheckpointSet() - b.FinishedCollection() records := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records.AddTo) @@ -81,17 +80,18 @@ func TestUngroupedStateless(t *testing.T) { "lastvalue.b/C~D&E~F/R~V": 20, // labels2 "lastvalue.b//R~V": 30, // labels3 }, records.Map) + b.FinishedCollection() // Verify that state was reset checkpointSet = b.CheckpointSet() - b.FinishedCollection() _ = checkpointSet.ForEach(func(rec export.Record) error { t.Fatal("Unexpected call") return nil }) + b.FinishedCollection() } -func TestUngroupedStateful(t *testing.T) { +func TestSimpleStateful(t *testing.T) { ctx := context.Background() b := simple.New(test.NewAggregationSelector(), true) @@ -116,12 +116,12 @@ func TestUngroupedStateful(t *testing.T) { // Test that state was NOT reset checkpointSet = b.CheckpointSet() - b.FinishedCollection() records2 := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records2.AddTo) require.EqualValues(t, records1.Map, records2.Map) + b.FinishedCollection() // Update and re-checkpoint the original record. _ = caggA.Update(ctx, metric.NewInt64Number(20), &test.CounterADesc) @@ -132,19 +132,18 @@ func TestUngroupedStateful(t *testing.T) { // As yet cagg has not been passed to Integrator.Process. Should // not see an update. checkpointSet = b.CheckpointSet() - b.FinishedCollection() records3 := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records3.AddTo) require.EqualValues(t, records1.Map, records3.Map) + b.FinishedCollection() // Now process the second update _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA)) _ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB)) checkpointSet = b.CheckpointSet() - b.FinishedCollection() records4 := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records4.AddTo) @@ -153,4 +152,5 @@ func TestUngroupedStateful(t *testing.T) { "sum.a/C~D&G~H/R~V": 30, "sum.b/C~D&G~H/R~V": 30, }, records4.Map) + b.FinishedCollection() } diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 8de0953e3..f3939a41d 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -370,6 +370,7 @@ func (m *Accumulator) Collect(ctx context.Context) int { checkpointed := m.collectSyncInstruments(ctx) checkpointed += m.observeAsyncInstruments(ctx) m.currentEpoch++ + return checkpointed }