1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-12 10:04:29 +02:00

Add a pull controller, use it for Prometheus (#751)

* Checkpoint new pull controller

* Tests pass

* Fix example

* Example fix

* Add a test

* Comment

* address MrAlias's feedback
This commit is contained in:
Joshua MacDonald 2020-05-20 10:27:26 -07:00 committed by GitHub
parent 15e8edd498
commit 5461669733
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 416 additions and 191 deletions

View File

@ -25,15 +25,14 @@ import (
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
)
var (
lemonsKey = kv.Key("ex.com/lemons")
)
func initMeter() *push.Controller {
pusher, exporter, err := prometheus.InstallNewPipeline(prometheus.Config{})
func initMeter() {
exporter, err := prometheus.InstallNewPipeline(prometheus.Config{})
if err != nil {
log.Panicf("failed to initialize prometheus exporter %v", err)
}
@ -41,12 +40,10 @@ func initMeter() *push.Controller {
go func() {
_ = http.ListenAndServe(":2222", nil)
}()
return pusher
}
func main() {
defer initMeter().Stop()
initMeter()
meter := global.Meter("ex.com/basic")
observerLock := new(sync.RWMutex)

View File

@ -25,30 +25,22 @@ import (
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
sdk "go.opentelemetry.io/otel/sdk/metric"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
// This test demonstrates that it is relatively difficult to setup a
// Prometheus export pipeline:
//
// 1. The default boundaries are difficult to pass, should be []float instead of []metric.Number
// 2. The push controller doesn't make sense b/c Prometheus is pull-bsaed
//
// TODO: Address these issues; add Resources to the test.
// TODO: Address this issue; add Resources to the test.
func ExampleNewExportPipeline() {
// Create a meter
selector := simple.NewWithHistogramDistribution(nil)
exporter, err := prometheus.NewRawExporter(prometheus.Config{})
exporter, err := prometheus.NewExportPipeline(prometheus.Config{})
if err != nil {
panic(err)
}
integrator := integrator.New(selector, true)
meterImpl := sdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(meterImpl, "example")
meter := exporter.Provider().Meter("example")
ctx := context.Background()
// Use two instruments
@ -64,13 +56,6 @@ func ExampleNewExportPipeline() {
counter.Add(ctx, 100, kv.String("key", "value"))
recorder.Record(ctx, 100, kv.String("key", "value"))
// Simulate a push
meterImpl.Collect(ctx)
err = exporter.Export(ctx, integrator.CheckpointSet())
if err != nil {
panic(err)
}
// GET the HTTP endpoint
var input bytes.Buffer
resp := httptest.NewRecorder()

View File

@ -29,27 +29,34 @@ import (
"go.opentelemetry.io/otel/api/label"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
// Exporter is an implementation of metric.Exporter that sends metrics to
// Prometheus.
//
// This exporter supports Prometheus pulls, as such it does not
// implement the export.Exporter interface.
type Exporter struct {
handler http.Handler
registerer prometheus.Registerer
gatherer prometheus.Gatherer
// lock protects access to the controller. The controller
// exposes its own lock, but using a dedicated lock in this
// struct allows the exporter to potentially support multiple
// controllers (e.g., with different resources).
lock sync.RWMutex
snapshot export.CheckpointSet
controller *pull.Controller
onError func(error)
defaultSummaryQuantiles []float64
defaultHistogramBoundaries []metric.Number
}
var _ export.Exporter = &Exporter{}
var _ http.Handler = &Exporter{}
// Config is a set of configs for the tally reporter.
@ -85,9 +92,9 @@ type Config struct {
OnError func(error)
}
// NewRawExporter returns a new prometheus exporter for prometheus metrics
// for use in a pipeline.
func NewRawExporter(config Config) (*Exporter, error) {
// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// using the recommended selector and standard integrator. See the pull.Options.
func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error) {
if config.Registry == nil {
config.Registry = prometheus.NewRegistry()
}
@ -115,9 +122,12 @@ func NewRawExporter(config Config) (*Exporter, error) {
onError: config.OnError,
}
c := newCollector(e)
c := &collector{
exp: e,
}
e.SetController(config, options...)
if err := config.Registerer.Register(c); err != nil {
config.OnError(fmt.Errorf("cannot register the collector: %w", err))
return nil, fmt.Errorf("cannot register the collector: %w", err)
}
return e, nil
@ -126,7 +136,7 @@ func NewRawExporter(config Config) (*Exporter, error) {
// InstallNewPipeline instantiates a NewExportPipeline and registers it globally.
// Typically called as:
//
// pipeline, hf, err := prometheus.InstallNewPipeline(prometheus.Config{...})
// hf, err := prometheus.InstallNewPipeline(prometheus.Config{...})
//
// if err != nil {
// ...
@ -134,28 +144,21 @@ func NewRawExporter(config Config) (*Exporter, error) {
// http.HandleFunc("/metrics", hf)
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) {
controller, exp, err := NewExportPipeline(config, options...)
func InstallNewPipeline(config Config, options ...pull.Option) (*Exporter, error) {
exp, err := NewExportPipeline(config, options...)
if err != nil {
return controller, exp, err
return nil, err
}
global.SetMeterProvider(controller.Provider())
return controller, exp, err
global.SetMeterProvider(exp.Provider())
return exp, nil
}
// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and integrators.
//
// The returned Controller contains an implementation of
// `metric.Provider`. The controller is returned unstarted and should
// be started by the caller to begin collection.
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) {
exporter, err := NewRawExporter(config)
if err != nil {
return nil, nil, err
}
// Prometheus uses a stateful push controller since instruments are
// SetController sets up a standard *pull.Controller as the metric provider
// for this exporter.
func (e *Exporter) SetController(config Config, options ...pull.Option) {
e.lock.Lock()
defer e.lock.Unlock()
// Prometheus uses a stateful pull controller since instruments are
// cumulative and should not be reset after each collection interval.
//
// Prometheus uses this approach to be resilient to scrape failures.
@ -163,22 +166,29 @@ func NewExportPipeline(config Config, options ...push.Option) (*push.Controller,
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
pusher := push.New(
//
// TODO: Prometheus supports "Gauge Histogram" which are
// expressed as delta histograms.
e.controller = pull.New(
simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
exporter,
append(options, push.WithStateful(true))...,
append(options, pull.WithStateful(true))...,
)
return pusher, exporter, nil
}
// Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
// TODO: Use the resource value in this exporter.
e.lock.Lock()
defer e.lock.Unlock()
e.snapshot = checkpointSet
return nil
// Provider returns the metric.Provider of this exporter.
func (e *Exporter) Provider() metric.Provider {
return e.controller.Provider()
}
// Controller returns the controller object that coordinates collection for the SDK.
func (e *Exporter) Controller() *pull.Controller {
e.lock.RLock()
defer e.lock.RUnlock()
return e.controller
}
func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
e.handler.ServeHTTP(w, r)
}
// collector implements prometheus.Collector interface.
@ -188,24 +198,11 @@ type collector struct {
var _ prometheus.Collector = (*collector)(nil)
func newCollector(exporter *Exporter) *collector {
return &collector{
exp: exporter,
}
}
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
c.exp.lock.RLock()
defer c.exp.lock.RUnlock()
if c.exp.snapshot == nil {
return
}
c.exp.snapshot.RLock()
defer c.exp.snapshot.RUnlock()
_ = c.exp.snapshot.ForEach(func(record export.Record) error {
_ = c.exp.Controller().ForEach(func(record export.Record) error {
ch <- c.toDesc(&record)
return nil
})
@ -219,14 +216,10 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.exp.lock.RLock()
defer c.exp.lock.RUnlock()
if c.exp.snapshot == nil {
return
}
ctrl := c.exp.Controller()
ctrl.Collect(context.Background())
c.exp.snapshot.RLock()
defer c.exp.snapshot.RUnlock()
err := c.exp.snapshot.ForEach(func(record export.Record) error {
err := ctrl.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
// TODO: Use the resource value in this record.
@ -359,10 +352,6 @@ func (c *collector) toDesc(record *export.Record) *prometheus.Desc {
return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labels, nil)
}
func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
e.handler.ServeHTTP(w, r)
}
func labelsKeys(labels *label.Set) []string {
iter := labels.Iter()
keys := make([]string, 0, iter.Len())

View File

@ -20,116 +20,64 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"runtime"
"sort"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
exportTest "go.opentelemetry.io/otel/exporters/metric/test"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
)
func TestPrometheusExporter(t *testing.T) {
exporter, err := prometheus.NewRawExporter(prometheus.Config{
DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99},
exporter, err := prometheus.NewExportPipeline(prometheus.Config{
DefaultHistogramBoundaries: []metric.Number{metric.NewFloat64Number(-0.5), metric.NewFloat64Number(1)},
})
require.NoError(t, err)
var expected []string
checkpointSet := exportTest.NewCheckpointSet(nil)
meter := exporter.Provider().Meter("test")
counter := metric.NewDescriptor(
"counter", metric.CounterKind, metric.Float64NumberKind)
lastValue := metric.NewDescriptor(
"lastvalue", metric.ValueObserverKind, metric.Float64NumberKind)
valuerecorder := metric.NewDescriptor(
"valuerecorder", metric.ValueRecorderKind, metric.Float64NumberKind)
histogramValueRecorder := metric.NewDescriptor(
"histogram_valuerecorder", metric.ValueRecorderKind, metric.Float64NumberKind)
counter := metric.Must(meter).NewFloat64Counter("counter")
valuerecorder := metric.Must(meter).NewFloat64ValueRecorder("valuerecorder")
labels := []kv.KeyValue{
kv.Key("A").String("B"),
kv.Key("C").String("D"),
}
ctx := context.Background()
var expected []string
counter.Add(ctx, 10, labels...)
counter.Add(ctx, 5.3, labels...)
checkpointSet.AddCounter(&counter, 15.3, labels...)
expected = append(expected, `counter{A="B",C="D"} 15.3`)
checkpointSet.AddLastValue(&lastValue, 13.2, labels...)
expected = append(expected, `lastvalue{A="B",C="D"} 13.2`)
valuerecorder.Record(ctx, -0.6, labels...)
valuerecorder.Record(ctx, -0.4, labels...)
valuerecorder.Record(ctx, 0.6, labels...)
valuerecorder.Record(ctx, 20, labels...)
checkpointSet.AddValueRecorder(&valuerecorder, 13, labels...)
checkpointSet.AddValueRecorder(&valuerecorder, 15, labels...)
checkpointSet.AddValueRecorder(&valuerecorder, 17, labels...)
expected = append(expected, `valuerecorder{A="B",C="D",quantile="0.5"} 15`)
expected = append(expected, `valuerecorder{A="B",C="D",quantile="0.9"} 17`)
expected = append(expected, `valuerecorder{A="B",C="D",quantile="0.99"} 17`)
expected = append(expected, `valuerecorder_sum{A="B",C="D"} 45`)
expected = append(expected, `valuerecorder_count{A="B",C="D"} 3`)
expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="+Inf"} 4`)
expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="-0.5"} 1`)
expected = append(expected, `valuerecorder_bucket{A="B",C="D",le="1"} 3`)
expected = append(expected, `valuerecorder_count{A="B",C="D"} 4`)
expected = append(expected, `valuerecorder_sum{A="B",C="D"} 19.6`)
boundaries := []metric.Number{metric.NewFloat64Number(-0.5), metric.NewFloat64Number(1)}
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.6, labels...)
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.4, labels...)
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, 0.6, labels...)
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, 20, labels...)
expected = append(expected, `histogram_valuerecorder_bucket{A="B",C="D",le="+Inf"} 4`)
expected = append(expected, `histogram_valuerecorder_bucket{A="B",C="D",le="-0.5"} 1`)
expected = append(expected, `histogram_valuerecorder_bucket{A="B",C="D",le="1"} 3`)
expected = append(expected, `histogram_valuerecorder_count{A="B",C="D"} 4`)
expected = append(expected, `histogram_valuerecorder_sum{A="B",C="D"} 19.6`)
missingLabels := []kv.KeyValue{
kv.Key("A").String("E"),
kv.Key("C").String(""),
compareExport(t, exporter, expected)
}
checkpointSet.AddCounter(&counter, 12, missingLabels...)
expected = append(expected, `counter{A="E",C=""} 12`)
checkpointSet.AddLastValue(&lastValue, 32, missingLabels...)
expected = append(expected, `lastvalue{A="E",C=""} 32`)
checkpointSet.AddValueRecorder(&valuerecorder, 19, missingLabels...)
expected = append(expected, `valuerecorder{A="E",C="",quantile="0.5"} 19`)
expected = append(expected, `valuerecorder{A="E",C="",quantile="0.9"} 19`)
expected = append(expected, `valuerecorder{A="E",C="",quantile="0.99"} 19`)
expected = append(expected, `valuerecorder_count{A="E",C=""} 1`)
expected = append(expected, `valuerecorder_sum{A="E",C=""} 19`)
boundaries = []metric.Number{metric.NewFloat64Number(0), metric.NewFloat64Number(1)}
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.6, missingLabels...)
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.4, missingLabels...)
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, -0.1, missingLabels...)
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, 15, missingLabels...)
checkpointSet.AddHistogramValueRecorder(&histogramValueRecorder, boundaries, 15, missingLabels...)
expected = append(expected, `histogram_valuerecorder_bucket{A="E",C="",le="+Inf"} 5`)
expected = append(expected, `histogram_valuerecorder_bucket{A="E",C="",le="0"} 3`)
expected = append(expected, `histogram_valuerecorder_bucket{A="E",C="",le="1"} 3`)
expected = append(expected, `histogram_valuerecorder_count{A="E",C=""} 5`)
expected = append(expected, `histogram_valuerecorder_sum{A="E",C=""} 28.9`)
compareExport(t, exporter, checkpointSet, expected)
}
func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *exportTest.CheckpointSet, expected []string) {
err := exporter.Export(context.Background(), checkpointSet)
require.Nil(t, err)
func compareExport(t *testing.T, exporter *prometheus.Exporter, expected []string) {
rec := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
exporter.ServeHTTP(rec, req)
output := rec.Body.String()
lines := strings.Split(output, "\n")
var metricsOnly []string
for _, line := range lines {
if !strings.HasPrefix(line, "#") && line != "" {
@ -145,13 +93,13 @@ func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *e
func TestPrometheusStatefulness(t *testing.T) {
// Create a meter
controller, exporter, err := prometheus.NewExportPipeline(prometheus.Config{}, push.WithPeriod(time.Minute))
exporter, err := prometheus.NewExportPipeline(
prometheus.Config{},
pull.WithCachePeriod(0),
)
require.NoError(t, err)
meter := controller.Provider().Meter("test")
mock := controllerTest.NewMockClock()
controller.SetClock(mock)
controller.Start()
meter := exporter.Provider().Meter("test")
// GET the HTTP endpoint
scrape := func() string {
@ -176,10 +124,6 @@ func TestPrometheusStatefulness(t *testing.T) {
counter.Add(ctx, 100, kv.String("key", "value"))
// Trigger a push
mock.Add(time.Minute)
runtime.Gosched()
require.Equal(t, `# HELP a_counter Counts things
# TYPE a_counter counter
a_counter{key="value"} 100
@ -187,10 +131,6 @@ a_counter{key="value"} 100
counter.Add(ctx, 100, kv.String("key", "value"))
// Again, now expect cumulative count
mock.Add(time.Minute)
runtime.Gosched()
require.Equal(t, `# HELP a_counter Counts things
# TYPE a_counter counter
a_counter{key="value"} 200

View File

@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pull
import (
"time"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
// Config contains configuration for a push Controller.
type Config struct {
// ErrorHandler is the function called when the Controller encounters an error.
//
// This option can be overridden after instantiation of the Controller
// with the `SetErrorHandler` method.
ErrorHandler sdk.ErrorHandler
// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
Resource *resource.Resource
// Stateful causes the controller to maintain state across
// collection events, so that records in the exported
// checkpoint set are cumulative.
Stateful bool
// CachePeriod is the period which a recently-computed result
// will be returned without gathering metric data again.
//
// If the period is zero, caching of the result is disabled.
// The default value is 10 seconds.
CachePeriod time.Duration
}
// Option is the interface that applies the value to a configuration option.
type Option interface {
// Apply sets the Option value of a Config.
Apply(*Config)
}
// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn sdk.ErrorHandler) Option {
return errorHandlerOption(fn)
}
type errorHandlerOption sdk.ErrorHandler
func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = sdk.ErrorHandler(o)
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}
}
type resourceOption struct{ *resource.Resource }
func (o resourceOption) Apply(config *Config) {
config.Resource = o.Resource
}
// WithStateful sets the Stateful configuration option of a Config.
func WithStateful(stateful bool) Option {
return statefulOption(stateful)
}
type statefulOption bool
func (o statefulOption) Apply(config *Config) {
config.Stateful = bool(o)
}
// WithCachePeriod sets the CachePeriod configuration option of a Config.
func WithCachePeriod(cachePeriod time.Duration) Option {
return cachePeriodOption(cachePeriod)
}
type cachePeriodOption time.Duration
func (o cachePeriodOption) Apply(config *Config) {
config.CachePeriod = time.Duration(o)
}

View File

@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pull // import "go.opentelemetry.io/otel/sdk/metric/controller/pull"
import (
"context"
"time"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/resource"
)
// DefaultCachePeriod determines how long a recently-computed result
// will be returned without gathering metric data again.
const DefaultCachePeriod time.Duration = 10 * time.Second
// Controller manages access to a *sdk.Accumulator and
// *simple.Integrator. Use Provider() for obtaining Meters. Use
// Foreach() for accessing current records.
type Controller struct {
accumulator *sdk.Accumulator
integrator *integrator.Integrator
provider *registry.Provider
period time.Duration
lastCollect time.Time
clock controllerTime.Clock
checkpoint export.CheckpointSet
}
// New returns a *Controller configured with an aggregation selector and options.
func New(selector export.AggregationSelector, options ...Option) *Controller {
config := &Config{
Resource: resource.Empty(),
ErrorHandler: sdk.DefaultErrorHandler,
CachePeriod: DefaultCachePeriod,
}
for _, opt := range options {
opt.Apply(config)
}
integrator := integrator.New(selector, config.Stateful)
accum := sdk.NewAccumulator(
integrator,
sdk.WithResource(config.Resource),
sdk.WithErrorHandler(config.ErrorHandler),
)
return &Controller{
accumulator: accum,
integrator: integrator,
provider: registry.NewProvider(accum),
period: config.CachePeriod,
checkpoint: integrator.CheckpointSet(),
clock: controllerTime.RealClock{},
}
}
// SetClock sets the clock used for caching. For testing purposes.
func (c *Controller) SetClock(clock controllerTime.Clock) {
c.integrator.Lock()
defer c.integrator.Unlock()
c.clock = clock
}
// Provider returns a metric.Provider for the implementation managed
// by this controller.
func (c *Controller) Provider() metric.Provider {
return c.provider
}
// Foreach gives the caller read-locked access to the current
// export.CheckpointSet.
func (c *Controller) ForEach(f func(export.Record) error) error {
c.integrator.RLock()
defer c.integrator.RUnlock()
return c.checkpoint.ForEach(f)
}
// Collect requests a collection. The collection will be skipped if
// the last collection is aged less than the CachePeriod.
func (c *Controller) Collect(ctx context.Context) {
c.integrator.Lock()
defer c.integrator.Unlock()
if c.period > 0 {
now := c.clock.Now()
elapsed := now.Sub(c.lastCollect)
if elapsed < c.period {
return
}
c.lastCollect = now
}
c.accumulator.Collect(ctx)
c.checkpoint = c.integrator.CheckpointSet()
}

View File

@ -0,0 +1,112 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pull_test
import (
"context"
"runtime"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
func TestPullNoCache(t *testing.T) {
puller := pull.New(
selector.NewWithExactDistribution(),
pull.WithCachePeriod(0),
pull.WithStateful(true),
)
ctx := context.Background()
meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter")
counter.Add(ctx, 10, kv.String("A", "B"))
puller.Collect(ctx)
records := test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo)
require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
counter.Add(ctx, 10, kv.String("A", "B"))
puller.Collect(ctx)
records = test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo)
require.EqualValues(t, map[string]float64{
"counter/A=B/": 20,
}, records.Map)
}
func TestPullWithCache(t *testing.T) {
puller := pull.New(
selector.NewWithExactDistribution(),
pull.WithCachePeriod(time.Second),
pull.WithStateful(true),
)
mock := controllerTest.NewMockClock()
puller.SetClock(mock)
ctx := context.Background()
meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter")
counter.Add(ctx, 10, kv.String("A", "B"))
puller.Collect(ctx)
records := test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo)
require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
counter.Add(ctx, 10, kv.String("A", "B"))
// Cached value!
puller.Collect(ctx)
records = test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo)
require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
mock.Add(time.Second)
runtime.Gosched()
// Re-computed value!
puller.Collect(ctx)
records = test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo)
require.EqualValues(t, map[string]float64{
"counter/A=B/": 20,
}, records.Map)
}

View File

@ -81,9 +81,7 @@ func NewWithHistogramDistribution(boundaries []metric.Number) export.Aggregation
func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
switch descriptor.MetricKind() {
case metric.ValueObserverKind:
fallthrough
case metric.ValueRecorderKind:
case metric.ValueObserverKind, metric.ValueRecorderKind:
return minmaxsumcount.New(descriptor)
default:
return sum.New()
@ -92,9 +90,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.A
func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
switch descriptor.MetricKind() {
case metric.ValueObserverKind:
fallthrough
case metric.ValueRecorderKind:
case metric.ValueObserverKind, metric.ValueRecorderKind:
return ddsketch.New(s.config, descriptor)
default:
return sum.New()
@ -103,9 +99,7 @@ func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor) export.Aggr
func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
switch descriptor.MetricKind() {
case metric.ValueObserverKind:
fallthrough
case metric.ValueRecorderKind:
case metric.ValueObserverKind, metric.ValueRecorderKind:
return array.New()
default:
return sum.New()
@ -114,9 +108,7 @@ func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega
func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
switch descriptor.MetricKind() {
case metric.ValueObserverKind:
fallthrough
case metric.ValueRecorderKind:
case metric.ValueObserverKind, metric.ValueRecorderKind:
return histogram.New(descriptor, s.boundaries)
default:
return sum.New()