From 5461669733c8ddb5b4f90a462211af77fda99474 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 20 May 2020 10:27:26 -0700 Subject: [PATCH] Add a pull controller, use it for Prometheus (#751) * Checkpoint new pull controller * Tests pass * Fix example * Example fix * Add a test * Comment * address MrAlias's feedback --- example/prometheus/main.go | 9 +- exporters/metric/prometheus/example_test.go | 21 +-- exporters/metric/prometheus/prometheus.go | 121 ++++++++---------- .../metric/prometheus/prometheus_test.go | 118 +++++------------ sdk/metric/controller/pull/config.go | 97 ++++++++++++++ sdk/metric/controller/pull/pull.go | 113 ++++++++++++++++ sdk/metric/controller/pull/pull_test.go | 112 ++++++++++++++++ sdk/metric/selector/simple/simple.go | 16 +-- 8 files changed, 416 insertions(+), 191 deletions(-) create mode 100644 sdk/metric/controller/pull/config.go create mode 100644 sdk/metric/controller/pull/pull.go create mode 100644 sdk/metric/controller/pull/pull_test.go diff --git a/example/prometheus/main.go b/example/prometheus/main.go index 117a2b13f..8000dec0e 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -25,15 +25,14 @@ import ( "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/prometheus" - "go.opentelemetry.io/otel/sdk/metric/controller/push" ) var ( lemonsKey = kv.Key("ex.com/lemons") ) -func initMeter() *push.Controller { - pusher, exporter, err := prometheus.InstallNewPipeline(prometheus.Config{}) +func initMeter() { + exporter, err := prometheus.InstallNewPipeline(prometheus.Config{}) if err != nil { log.Panicf("failed to initialize prometheus exporter %v", err) } @@ -41,12 +40,10 @@ func initMeter() *push.Controller { go func() { _ = http.ListenAndServe(":2222", nil) }() - - return pusher } func main() { - defer initMeter().Stop() + initMeter() meter := global.Meter("ex.com/basic") observerLock := new(sync.RWMutex) diff --git a/exporters/metric/prometheus/example_test.go b/exporters/metric/prometheus/example_test.go index 1a15e38de..9c7092866 100644 --- a/exporters/metric/prometheus/example_test.go +++ b/exporters/metric/prometheus/example_test.go @@ -25,30 +25,22 @@ import ( "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/prometheus" - sdk "go.opentelemetry.io/otel/sdk/metric" - integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) // This test demonstrates that it is relatively difficult to setup a // Prometheus export pipeline: // // 1. The default boundaries are difficult to pass, should be []float instead of []metric.Number -// 2. The push controller doesn't make sense b/c Prometheus is pull-bsaed // -// TODO: Address these issues; add Resources to the test. +// TODO: Address this issue; add Resources to the test. func ExampleNewExportPipeline() { // Create a meter - selector := simple.NewWithHistogramDistribution(nil) - exporter, err := prometheus.NewRawExporter(prometheus.Config{}) + exporter, err := prometheus.NewExportPipeline(prometheus.Config{}) if err != nil { panic(err) } - integrator := integrator.New(selector, true) - meterImpl := sdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(meterImpl, "example") - + meter := exporter.Provider().Meter("example") ctx := context.Background() // Use two instruments @@ -64,13 +56,6 @@ func ExampleNewExportPipeline() { counter.Add(ctx, 100, kv.String("key", "value")) recorder.Record(ctx, 100, kv.String("key", "value")) - // Simulate a push - meterImpl.Collect(ctx) - err = exporter.Export(ctx, integrator.CheckpointSet()) - if err != nil { - panic(err) - } - // GET the HTTP endpoint var input bytes.Buffer resp := httptest.NewRecorder() diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 44f7fb016..d76a1faf3 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -29,27 +29,34 @@ import ( "go.opentelemetry.io/otel/api/label" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" - "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/controller/pull" "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) // Exporter is an implementation of metric.Exporter that sends metrics to // Prometheus. +// +// This exporter supports Prometheus pulls, as such it does not +// implement the export.Exporter interface. type Exporter struct { handler http.Handler registerer prometheus.Registerer gatherer prometheus.Gatherer - lock sync.RWMutex - snapshot export.CheckpointSet - onError func(error) + // lock protects access to the controller. The controller + // exposes its own lock, but using a dedicated lock in this + // struct allows the exporter to potentially support multiple + // controllers (e.g., with different resources). + lock sync.RWMutex + controller *pull.Controller + + onError func(error) defaultSummaryQuantiles []float64 defaultHistogramBoundaries []metric.Number } -var _ export.Exporter = &Exporter{} var _ http.Handler = &Exporter{} // Config is a set of configs for the tally reporter. @@ -85,9 +92,9 @@ type Config struct { OnError func(error) } -// NewRawExporter returns a new prometheus exporter for prometheus metrics -// for use in a pipeline. -func NewRawExporter(config Config) (*Exporter, error) { +// NewExportPipeline sets up a complete export pipeline with the recommended setup, +// using the recommended selector and standard integrator. See the pull.Options. +func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error) { if config.Registry == nil { config.Registry = prometheus.NewRegistry() } @@ -115,9 +122,12 @@ func NewRawExporter(config Config) (*Exporter, error) { onError: config.OnError, } - c := newCollector(e) + c := &collector{ + exp: e, + } + e.SetController(config, options...) if err := config.Registerer.Register(c); err != nil { - config.OnError(fmt.Errorf("cannot register the collector: %w", err)) + return nil, fmt.Errorf("cannot register the collector: %w", err) } return e, nil @@ -126,7 +136,7 @@ func NewRawExporter(config Config) (*Exporter, error) { // InstallNewPipeline instantiates a NewExportPipeline and registers it globally. // Typically called as: // -// pipeline, hf, err := prometheus.InstallNewPipeline(prometheus.Config{...}) +// hf, err := prometheus.InstallNewPipeline(prometheus.Config{...}) // // if err != nil { // ... @@ -134,28 +144,21 @@ func NewRawExporter(config Config) (*Exporter, error) { // http.HandleFunc("/metrics", hf) // defer pipeline.Stop() // ... Done -func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) { - controller, exp, err := NewExportPipeline(config, options...) +func InstallNewPipeline(config Config, options ...pull.Option) (*Exporter, error) { + exp, err := NewExportPipeline(config, options...) if err != nil { - return controller, exp, err + return nil, err } - global.SetMeterProvider(controller.Provider()) - return controller, exp, err + global.SetMeterProvider(exp.Provider()) + return exp, nil } -// NewExportPipeline sets up a complete export pipeline with the recommended setup, -// chaining a NewRawExporter into the recommended selectors and integrators. -// -// The returned Controller contains an implementation of -// `metric.Provider`. The controller is returned unstarted and should -// be started by the caller to begin collection. -func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) { - exporter, err := NewRawExporter(config) - if err != nil { - return nil, nil, err - } - - // Prometheus uses a stateful push controller since instruments are +// SetController sets up a standard *pull.Controller as the metric provider +// for this exporter. +func (e *Exporter) SetController(config Config, options ...pull.Option) { + e.lock.Lock() + defer e.lock.Unlock() + // Prometheus uses a stateful pull controller since instruments are // cumulative and should not be reset after each collection interval. // // Prometheus uses this approach to be resilient to scrape failures. @@ -163,22 +166,29 @@ func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, // it could try again on the next scrape and no data would be lost, only resolution. // // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. - pusher := push.New( + // + // TODO: Prometheus supports "Gauge Histogram" which are + // expressed as delta histograms. + e.controller = pull.New( simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries), - exporter, - append(options, push.WithStateful(true))..., + append(options, pull.WithStateful(true))..., ) - - return pusher, exporter, nil } -// Export exports the provide metric record to prometheus. -func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { - // TODO: Use the resource value in this exporter. - e.lock.Lock() - defer e.lock.Unlock() - e.snapshot = checkpointSet - return nil +// Provider returns the metric.Provider of this exporter. +func (e *Exporter) Provider() metric.Provider { + return e.controller.Provider() +} + +// Controller returns the controller object that coordinates collection for the SDK. +func (e *Exporter) Controller() *pull.Controller { + e.lock.RLock() + defer e.lock.RUnlock() + return e.controller +} + +func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { + e.handler.ServeHTTP(w, r) } // collector implements prometheus.Collector interface. @@ -188,24 +198,11 @@ type collector struct { var _ prometheus.Collector = (*collector)(nil) -func newCollector(exporter *Exporter) *collector { - return &collector{ - exp: exporter, - } -} - func (c *collector) Describe(ch chan<- *prometheus.Desc) { c.exp.lock.RLock() defer c.exp.lock.RUnlock() - if c.exp.snapshot == nil { - return - } - - c.exp.snapshot.RLock() - defer c.exp.snapshot.RUnlock() - - _ = c.exp.snapshot.ForEach(func(record export.Record) error { + _ = c.exp.Controller().ForEach(func(record export.Record) error { ch <- c.toDesc(&record) return nil }) @@ -219,14 +216,10 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { c.exp.lock.RLock() defer c.exp.lock.RUnlock() - if c.exp.snapshot == nil { - return - } + ctrl := c.exp.Controller() + ctrl.Collect(context.Background()) - c.exp.snapshot.RLock() - defer c.exp.snapshot.RUnlock() - - err := c.exp.snapshot.ForEach(func(record export.Record) error { + err := ctrl.ForEach(func(record export.Record) error { agg := record.Aggregator() numberKind := record.Descriptor().NumberKind() // TODO: Use the resource value in this record. @@ -359,10 +352,6 @@ func (c *collector) toDesc(record *export.Record) *prometheus.Desc { return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labels, nil) } -func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { - e.handler.ServeHTTP(w, r) -} - func labelsKeys(labels *label.Set) []string { iter := labels.Iter() keys := make([]string, 0, iter.Len()) diff --git a/exporters/metric/prometheus/prometheus_test.go b/exporters/metric/prometheus/prometheus_test.go index 254cd1e55..db87e0313 100644 --- a/exporters/metric/prometheus/prometheus_test.go +++ b/exporters/metric/prometheus/prometheus_test.go @@ -20,116 +20,64 @@ import ( "io/ioutil" "net/http" "net/http/httptest" - "runtime" "sort" "strings" "testing" - "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/prometheus" - exportTest "go.opentelemetry.io/otel/exporters/metric/test" - "go.opentelemetry.io/otel/sdk/metric/controller/push" - controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test" + "go.opentelemetry.io/otel/sdk/metric/controller/pull" ) func TestPrometheusExporter(t *testing.T) { - exporter, err := prometheus.NewRawExporter(prometheus.Config{ - DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99}, + exporter, err := prometheus.NewExportPipeline(prometheus.Config{ + DefaultHistogramBoundaries: []metric.Number{metric.NewFloat64Number(-0.5), metric.NewFloat64Number(1)}, }) require.NoError(t, err) - var expected []string - checkpointSet := exportTest.NewCheckpointSet(nil) + meter := exporter.Provider().Meter("test") - counter := metric.NewDescriptor( - "counter", metric.CounterKind, metric.Float64NumberKind) - lastValue := metric.NewDescriptor( - "lastvalue", metric.ValueObserverKind, metric.Float64NumberKind) - valuerecorder := metric.NewDescriptor( - "valuerecorder", metric.ValueRecorderKind, metric.Float64NumberKind) - histogramValueRecorder := metric.NewDescriptor( - "histogram_valuerecorder", metric.ValueRecorderKind, metric.Float64NumberKind) + counter := metric.Must(meter).NewFloat64Counter("counter") + valuerecorder := metric.Must(meter).NewFloat64ValueRecorder("valuerecorder") labels := []kv.KeyValue{ kv.Key("A").String("B"), kv.Key("C").String("D"), } + ctx := context.Background() + + var expected []string + + counter.Add(ctx, 10, labels...) + counter.Add(ctx, 5.3, labels...) - checkpointSet.AddCounter(&counter, 15.3, labels...) expected = append(expected, `counter{A="B",C="D"} 15.3`) - checkpointSet.AddLastValue(&lastValue, 13.2, labels...) - expected = append(expected, `lastvalue{A="B",C="D"} 13.2`) + valuerecorder.Record(ctx, -0.6, labels...) + valuerecorder.Record(ctx, -0.4, labels...) + valuerecorder.Record(ctx, 0.6, labels...) + valuerecorder.Record(ctx, 20, labels...) - checkpointSet.AddValueRecorder(&valuerecorder, 13, labels...) - checkpointSet.AddValueRecorder(&valuerecorder, 15, labels...) - checkpointSet.AddValueRecorder(&valuerecorder, 17, labels...) - expected = append(expected, `valuerecorder{A="B",C="D",quantile="0.5"} 15`) - expected = append(expected, `valuerecorder{A="B",C="D",quantile="0.9"} 17`) - expected = append(expected, `valuerecorder{A="B",C="D",quantile="0.99"} 17`) - expected = append(expected, `valuerecorder_sum{A="B",C="D"} 45`) - expected = append(expected, `valuerecorder_count{A="B",C="D"} 3`) + expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="+Inf"} 4`) + expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="-0.5"} 1`) + expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="1"} 3`) + expected = append(expected, `valuerecorder_count{A="B",C="D"} 4`) + expected = append(expected, `valuerecorder_sum{A="B",C="D"} 19.6`) - boundaries := []metric.Number{metric.NewFloat64Number(-0.5), metric.NewFloat64Number(1)} - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.6, labels...) - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.4, labels...) - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, 0.6, labels...) - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, 20, labels...) - - expected = append(expected, `histogram_valuerecorder_bucket{A="B",C="D",le="+Inf"} 4`) - expected = append(expected, `histogram_valuerecorder_bucket{A="B",C="D",le="-0.5"} 1`) - expected = append(expected, `histogram_valuerecorder_bucket{A="B",C="D",le="1"} 3`) - expected = append(expected, `histogram_valuerecorder_count{A="B",C="D"} 4`) - expected = append(expected, `histogram_valuerecorder_sum{A="B",C="D"} 19.6`) - - missingLabels := []kv.KeyValue{ - kv.Key("A").String("E"), - kv.Key("C").String(""), - } - - checkpointSet.AddCounter(&counter, 12, missingLabels...) - expected = append(expected, `counter{A="E",C=""} 12`) - - checkpointSet.AddLastValue(&lastValue, 32, missingLabels...) - expected = append(expected, `lastvalue{A="E",C=""} 32`) - - checkpointSet.AddValueRecorder(&valuerecorder, 19, missingLabels...) - expected = append(expected, `valuerecorder{A="E",C="",quantile="0.5"} 19`) - expected = append(expected, `valuerecorder{A="E",C="",quantile="0.9"} 19`) - expected = append(expected, `valuerecorder{A="E",C="",quantile="0.99"} 19`) - expected = append(expected, `valuerecorder_count{A="E",C=""} 1`) - expected = append(expected, `valuerecorder_sum{A="E",C=""} 19`) - - boundaries = []metric.Number{metric.NewFloat64Number(0), metric.NewFloat64Number(1)} - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.6, missingLabels...) - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.4, missingLabels...) - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.1, missingLabels...) - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, 15, missingLabels...) - checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, 15, missingLabels...) - - expected = append(expected, `histogram_valuerecorder_bucket{A="E",C="",le="+Inf"} 5`) - expected = append(expected, `histogram_valuerecorder_bucket{A="E",C="",le="0"} 3`) - expected = append(expected, `histogram_valuerecorder_bucket{A="E",C="",le="1"} 3`) - expected = append(expected, `histogram_valuerecorder_count{A="E",C=""} 5`) - expected = append(expected, `histogram_valuerecorder_sum{A="E",C=""} 28.9`) - - compareExport(t, exporter, checkpointSet, expected) + compareExport(t, exporter, expected) } -func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *exportTest.CheckpointSet, expected []string) { - err := exporter.Export(context.Background(), checkpointSet) - require.Nil(t, err) - +func compareExport(t *testing.T, exporter *prometheus.Exporter, expected []string) { rec := httptest.NewRecorder() req := httptest.NewRequest("GET", "/metrics", nil) exporter.ServeHTTP(rec, req) output := rec.Body.String() lines := strings.Split(output, "\n") + var metricsOnly []string for _, line := range lines { if !strings.HasPrefix(line, "#") && line != "" { @@ -145,13 +93,13 @@ func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *e func TestPrometheusStatefulness(t *testing.T) { // Create a meter - controller, exporter, err := prometheus.NewExportPipeline(prometheus.Config{}, push.WithPeriod(time.Minute)) + exporter, err := prometheus.NewExportPipeline( + prometheus.Config{}, + pull.WithCachePeriod(0), + ) require.NoError(t, err) - meter := controller.Provider().Meter("test") - mock := controllerTest.NewMockClock() - controller.SetClock(mock) - controller.Start() + meter := exporter.Provider().Meter("test") // GET the HTTP endpoint scrape := func() string { @@ -176,10 +124,6 @@ func TestPrometheusStatefulness(t *testing.T) { counter.Add(ctx, 100, kv.String("key", "value")) - // Trigger a push - mock.Add(time.Minute) - runtime.Gosched() - require.Equal(t, `# HELP a_counter Counts things # TYPE a_counter counter a_counter{key="value"} 100 @@ -187,10 +131,6 @@ a_counter{key="value"} 100 counter.Add(ctx, 100, kv.String("key", "value")) - // Again, now expect cumulative count - mock.Add(time.Minute) - runtime.Gosched() - require.Equal(t, `# HELP a_counter Counts things # TYPE a_counter counter a_counter{key="value"} 200 diff --git a/sdk/metric/controller/pull/config.go b/sdk/metric/controller/pull/config.go new file mode 100644 index 000000000..56509fbcc --- /dev/null +++ b/sdk/metric/controller/pull/config.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pull + +import ( + "time" + + sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" +) + +// Config contains configuration for a push Controller. +type Config struct { + // ErrorHandler is the function called when the Controller encounters an error. + // + // This option can be overridden after instantiation of the Controller + // with the `SetErrorHandler` method. + ErrorHandler sdk.ErrorHandler + + // Resource is the OpenTelemetry resource associated with all Meters + // created by the Controller. + Resource *resource.Resource + + // Stateful causes the controller to maintain state across + // collection events, so that records in the exported + // checkpoint set are cumulative. + Stateful bool + + // CachePeriod is the period which a recently-computed result + // will be returned without gathering metric data again. + // + // If the period is zero, caching of the result is disabled. + // The default value is 10 seconds. + CachePeriod time.Duration +} + +// Option is the interface that applies the value to a configuration option. +type Option interface { + // Apply sets the Option value of a Config. + Apply(*Config) +} + +// WithErrorHandler sets the ErrorHandler configuration option of a Config. +func WithErrorHandler(fn sdk.ErrorHandler) Option { + return errorHandlerOption(fn) +} + +type errorHandlerOption sdk.ErrorHandler + +func (o errorHandlerOption) Apply(config *Config) { + config.ErrorHandler = sdk.ErrorHandler(o) +} + +// WithResource sets the Resource configuration option of a Config. +func WithResource(r *resource.Resource) Option { + return resourceOption{r} +} + +type resourceOption struct{ *resource.Resource } + +func (o resourceOption) Apply(config *Config) { + config.Resource = o.Resource +} + +// WithStateful sets the Stateful configuration option of a Config. +func WithStateful(stateful bool) Option { + return statefulOption(stateful) +} + +type statefulOption bool + +func (o statefulOption) Apply(config *Config) { + config.Stateful = bool(o) +} + +// WithCachePeriod sets the CachePeriod configuration option of a Config. +func WithCachePeriod(cachePeriod time.Duration) Option { + return cachePeriodOption(cachePeriod) +} + +type cachePeriodOption time.Duration + +func (o cachePeriodOption) Apply(config *Config) { + config.CachePeriod = time.Duration(o) +} diff --git a/sdk/metric/controller/pull/pull.go b/sdk/metric/controller/pull/pull.go new file mode 100644 index 000000000..eabd7732a --- /dev/null +++ b/sdk/metric/controller/pull/pull.go @@ -0,0 +1,113 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pull // import "go.opentelemetry.io/otel/sdk/metric/controller/pull" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/api/metric" + "go.opentelemetry.io/otel/api/metric/registry" + export "go.opentelemetry.io/otel/sdk/export/metric" + sdk "go.opentelemetry.io/otel/sdk/metric" + controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" + integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple" + "go.opentelemetry.io/otel/sdk/resource" +) + +// DefaultCachePeriod determines how long a recently-computed result +// will be returned without gathering metric data again. +const DefaultCachePeriod time.Duration = 10 * time.Second + +// Controller manages access to a *sdk.Accumulator and +// *simple.Integrator. Use Provider() for obtaining Meters. Use +// Foreach() for accessing current records. +type Controller struct { + accumulator *sdk.Accumulator + integrator *integrator.Integrator + provider *registry.Provider + period time.Duration + lastCollect time.Time + clock controllerTime.Clock + checkpoint export.CheckpointSet +} + +// New returns a *Controller configured with an aggregation selector and options. +func New(selector export.AggregationSelector, options ...Option) *Controller { + config := &Config{ + Resource: resource.Empty(), + ErrorHandler: sdk.DefaultErrorHandler, + CachePeriod: DefaultCachePeriod, + } + for _, opt := range options { + opt.Apply(config) + } + integrator := integrator.New(selector, config.Stateful) + accum := sdk.NewAccumulator( + integrator, + sdk.WithResource(config.Resource), + sdk.WithErrorHandler(config.ErrorHandler), + ) + return &Controller{ + accumulator: accum, + integrator: integrator, + provider: registry.NewProvider(accum), + period: config.CachePeriod, + checkpoint: integrator.CheckpointSet(), + clock: controllerTime.RealClock{}, + } +} + +// SetClock sets the clock used for caching. For testing purposes. +func (c *Controller) SetClock(clock controllerTime.Clock) { + c.integrator.Lock() + defer c.integrator.Unlock() + c.clock = clock +} + +// Provider returns a metric.Provider for the implementation managed +// by this controller. +func (c *Controller) Provider() metric.Provider { + return c.provider +} + +// Foreach gives the caller read-locked access to the current +// export.CheckpointSet. +func (c *Controller) ForEach(f func(export.Record) error) error { + c.integrator.RLock() + defer c.integrator.RUnlock() + + return c.checkpoint.ForEach(f) +} + +// Collect requests a collection. The collection will be skipped if +// the last collection is aged less than the CachePeriod. +func (c *Controller) Collect(ctx context.Context) { + c.integrator.Lock() + defer c.integrator.Unlock() + + if c.period > 0 { + now := c.clock.Now() + elapsed := now.Sub(c.lastCollect) + + if elapsed < c.period { + return + } + c.lastCollect = now + } + + c.accumulator.Collect(ctx) + c.checkpoint = c.integrator.CheckpointSet() +} diff --git a/sdk/metric/controller/pull/pull_test.go b/sdk/metric/controller/pull/pull_test.go new file mode 100644 index 000000000..b8d4cb583 --- /dev/null +++ b/sdk/metric/controller/pull/pull_test.go @@ -0,0 +1,112 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pull_test + +import ( + "context" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/label" + "go.opentelemetry.io/otel/api/metric" + "go.opentelemetry.io/otel/sdk/metric/controller/pull" + controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test" + "go.opentelemetry.io/otel/sdk/metric/integrator/test" + selector "go.opentelemetry.io/otel/sdk/metric/selector/simple" +) + +func TestPullNoCache(t *testing.T) { + puller := pull.New( + selector.NewWithExactDistribution(), + pull.WithCachePeriod(0), + pull.WithStateful(true), + ) + + ctx := context.Background() + meter := puller.Provider().Meter("nocache") + counter := metric.Must(meter).NewInt64Counter("counter") + + counter.Add(ctx, 10, kv.String("A", "B")) + + puller.Collect(ctx) + records := test.NewOutput(label.DefaultEncoder()) + _ = puller.ForEach(records.AddTo) + + require.EqualValues(t, map[string]float64{ + "counter/A=B/": 10, + }, records.Map) + + counter.Add(ctx, 10, kv.String("A", "B")) + + puller.Collect(ctx) + records = test.NewOutput(label.DefaultEncoder()) + _ = puller.ForEach(records.AddTo) + + require.EqualValues(t, map[string]float64{ + "counter/A=B/": 20, + }, records.Map) +} + +func TestPullWithCache(t *testing.T) { + puller := pull.New( + selector.NewWithExactDistribution(), + pull.WithCachePeriod(time.Second), + pull.WithStateful(true), + ) + mock := controllerTest.NewMockClock() + puller.SetClock(mock) + + ctx := context.Background() + meter := puller.Provider().Meter("nocache") + counter := metric.Must(meter).NewInt64Counter("counter") + + counter.Add(ctx, 10, kv.String("A", "B")) + + puller.Collect(ctx) + records := test.NewOutput(label.DefaultEncoder()) + _ = puller.ForEach(records.AddTo) + + require.EqualValues(t, map[string]float64{ + "counter/A=B/": 10, + }, records.Map) + + counter.Add(ctx, 10, kv.String("A", "B")) + + // Cached value! + puller.Collect(ctx) + records = test.NewOutput(label.DefaultEncoder()) + _ = puller.ForEach(records.AddTo) + + require.EqualValues(t, map[string]float64{ + "counter/A=B/": 10, + }, records.Map) + + mock.Add(time.Second) + runtime.Gosched() + + // Re-computed value! + puller.Collect(ctx) + records = test.NewOutput(label.DefaultEncoder()) + _ = puller.ForEach(records.AddTo) + + require.EqualValues(t, map[string]float64{ + "counter/A=B/": 20, + }, records.Map) + +} diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index b27d0af47..b64a5e971 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -81,9 +81,7 @@ func NewWithHistogramDistribution(boundaries []metric.Number) export.Aggregation func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { switch descriptor.MetricKind() { - case metric.ValueObserverKind: - fallthrough - case metric.ValueRecorderKind: + case metric.ValueObserverKind, metric.ValueRecorderKind: return minmaxsumcount.New(descriptor) default: return sum.New() @@ -92,9 +90,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.A func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { switch descriptor.MetricKind() { - case metric.ValueObserverKind: - fallthrough - case metric.ValueRecorderKind: + case metric.ValueObserverKind, metric.ValueRecorderKind: return ddsketch.New(s.config, descriptor) default: return sum.New() @@ -103,9 +99,7 @@ func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor) export.Aggr func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { switch descriptor.MetricKind() { - case metric.ValueObserverKind: - fallthrough - case metric.ValueRecorderKind: + case metric.ValueObserverKind, metric.ValueRecorderKind: return array.New() default: return sum.New() @@ -114,9 +108,7 @@ func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { switch descriptor.MetricKind() { - case metric.ValueObserverKind: - fallthrough - case metric.ValueRecorderKind: + case metric.ValueObserverKind, metric.ValueRecorderKind: return histogram.New(descriptor, s.boundaries) default: return sum.New()