1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Refactor the api/metrics push controller; add CheckpointSet synchronization (#737)

* Checkpoint

* Finish tests

* Checkpoint

* Checkpoint (builds)

* Checkpoint + RWMutex interface

* Comments

* Remove commitLock

* Apply feedback
This commit is contained in:
Joshua MacDonald
2020-05-18 18:37:41 -07:00
committed by GitHub
parent 69da3056f2
commit 21d094af43
18 changed files with 366 additions and 254 deletions

View File

@@ -33,11 +33,11 @@ var (
) )
func initMeter() *push.Controller { func initMeter() *push.Controller {
pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Config{}) pusher, exporter, err := prometheus.InstallNewPipeline(prometheus.Config{})
if err != nil { if err != nil {
log.Panicf("failed to initialize prometheus exporter %v", err) log.Panicf("failed to initialize prometheus exporter %v", err)
} }
http.HandleFunc("/", hf) http.HandleFunc("/", exporter.ServeHTTP)
go func() { go func() {
_ = http.ListenAndServe(":2222", nil) _ = http.ListenAndServe(":2222", nil)
}() }()

View File

@@ -18,7 +18,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"time" "sync"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
@@ -30,7 +30,6 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple"
) )
@@ -42,6 +41,7 @@ type Exporter struct {
registerer prometheus.Registerer registerer prometheus.Registerer
gatherer prometheus.Gatherer gatherer prometheus.Gatherer
lock sync.RWMutex
snapshot export.CheckpointSet snapshot export.CheckpointSet
onError func(error) onError func(error)
@@ -134,41 +134,49 @@ func NewRawExporter(config Config) (*Exporter, error) {
// http.HandleFunc("/metrics", hf) // http.HandleFunc("/metrics", hf)
// defer pipeline.Stop() // defer pipeline.Stop()
// ... Done // ... Done
func InstallNewPipeline(config Config) (*push.Controller, http.HandlerFunc, error) { func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) {
controller, hf, err := NewExportPipeline(config, time.Minute) controller, exp, err := NewExportPipeline(config, options...)
if err != nil { if err != nil {
return controller, hf, err return controller, exp, err
} }
global.SetMeterProvider(controller.Provider()) global.SetMeterProvider(controller.Provider())
return controller, hf, err return controller, exp, err
} }
// NewExportPipeline sets up a complete export pipeline with the recommended setup, // NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and integrators. // chaining a NewRawExporter into the recommended selectors and integrators.
func NewExportPipeline(config Config, period time.Duration) (*push.Controller, http.HandlerFunc, error) { //
selector := simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries) // The returned Controller contains an implementation of
// `metric.Provider`. The controller is returned unstarted and should
// be started by the caller to begin collection.
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, *Exporter, error) {
exporter, err := NewRawExporter(config) exporter, err := NewRawExporter(config)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
// Prometheus needs to use a stateful integrator since counters (and histogram since they are a collection of Counters) // Prometheus uses a stateful push controller since instruments are
// are cumulative (i.e., monotonically increasing values) and should not be resetted after each export. // cumulative and should not be reset after each collection interval.
// //
// Prometheus uses this approach to be resilient to scrape failures. // Prometheus uses this approach to be resilient to scrape failures.
// If a Prometheus server tries to scrape metrics from a host and fails for some reason, // If a Prometheus server tries to scrape metrics from a host and fails for some reason,
// it could try again on the next scrape and no data would be lost, only resolution. // it could try again on the next scrape and no data would be lost, only resolution.
// //
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
integrator := integrator.New(selector, true) pusher := push.New(
pusher := push.New(integrator, exporter, period) simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
pusher.Start() exporter,
append(options, push.WithStateful(true))...,
)
return pusher, exporter.ServeHTTP, nil return pusher, exporter, nil
} }
// Export exports the provide metric record to prometheus. // Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
// TODO: Use the resource value in this exporter.
e.lock.Lock()
defer e.lock.Unlock()
e.snapshot = checkpointSet e.snapshot = checkpointSet
return nil return nil
} }
@@ -187,10 +195,16 @@ func newCollector(exporter *Exporter) *collector {
} }
func (c *collector) Describe(ch chan<- *prometheus.Desc) { func (c *collector) Describe(ch chan<- *prometheus.Desc) {
c.exp.lock.RLock()
defer c.exp.lock.RUnlock()
if c.exp.snapshot == nil { if c.exp.snapshot == nil {
return return
} }
c.exp.snapshot.RLock()
defer c.exp.snapshot.RUnlock()
_ = c.exp.snapshot.ForEach(func(record export.Record) error { _ = c.exp.snapshot.ForEach(func(record export.Record) error {
ch <- c.toDesc(&record) ch <- c.toDesc(&record)
return nil return nil
@@ -202,10 +216,16 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
// Collect is invoked whenever prometheus.Gatherer is also invoked. // Collect is invoked whenever prometheus.Gatherer is also invoked.
// For example, when the HTTP endpoint is invoked by Prometheus. // For example, when the HTTP endpoint is invoked by Prometheus.
func (c *collector) Collect(ch chan<- prometheus.Metric) { func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.exp.lock.RLock()
defer c.exp.lock.RUnlock()
if c.exp.snapshot == nil { if c.exp.snapshot == nil {
return return
} }
c.exp.snapshot.RLock()
defer c.exp.snapshot.RUnlock()
err := c.exp.snapshot.ForEach(func(record export.Record) error { err := c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator() agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind() numberKind := record.Descriptor().NumberKind()

View File

@@ -15,31 +15,35 @@
package prometheus_test package prometheus_test
import ( import (
"bytes"
"context" "context"
"log" "io/ioutil"
"net/http"
"net/http/httptest" "net/http/httptest"
"runtime"
"sort" "sort"
"strings" "strings"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus" "go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/exporters/metric/test" exportTest "go.opentelemetry.io/otel/exporters/metric/test"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
) )
func TestPrometheusExporter(t *testing.T) { func TestPrometheusExporter(t *testing.T) {
exporter, err := prometheus.NewRawExporter(prometheus.Config{ exporter, err := prometheus.NewRawExporter(prometheus.Config{
DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99}, DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99},
}) })
if err != nil { require.NoError(t, err)
log.Panicf("failed to initialize prometheus exporter %v", err)
}
var expected []string var expected []string
checkpointSet := test.NewCheckpointSet(nil) checkpointSet := exportTest.NewCheckpointSet(nil)
counter := metric.NewDescriptor( counter := metric.NewDescriptor(
"counter", metric.CounterKind, metric.Float64NumberKind) "counter", metric.CounterKind, metric.Float64NumberKind)
@@ -116,7 +120,7 @@ func TestPrometheusExporter(t *testing.T) {
compareExport(t, exporter, checkpointSet, expected) compareExport(t, exporter, checkpointSet, expected)
} }
func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) { func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *exportTest.CheckpointSet, expected []string) {
err := exporter.Export(context.Background(), checkpointSet) err := exporter.Export(context.Background(), checkpointSet)
require.Nil(t, err) require.Nil(t, err)
@@ -138,3 +142,58 @@ func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *t
require.Equal(t, strings.Join(expected, "\n"), strings.Join(metricsOnly, "\n")) require.Equal(t, strings.Join(expected, "\n"), strings.Join(metricsOnly, "\n"))
} }
func TestPrometheusStatefulness(t *testing.T) {
// Create a meter
controller, exporter, err := prometheus.NewExportPipeline(prometheus.Config{}, push.WithPeriod(time.Minute))
require.NoError(t, err)
meter := controller.Provider().Meter("test")
mock := controllerTest.NewMockClock()
controller.SetClock(mock)
controller.Start()
// GET the HTTP endpoint
scrape := func() string {
var input bytes.Buffer
resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/", &input)
require.NoError(t, err)
exporter.ServeHTTP(resp, req)
data, err := ioutil.ReadAll(resp.Result().Body)
require.NoError(t, err)
return string(data)
}
ctx := context.Background()
counter := metric.Must(meter).NewInt64Counter(
"a.counter",
metric.WithDescription("Counts things"),
)
counter.Add(ctx, 100, kv.String("key", "value"))
// Trigger a push
mock.Add(time.Minute)
runtime.Gosched()
require.Equal(t, `# HELP a_counter Counts things
# TYPE a_counter counter
a_counter{key="value"} 100
`, scrape())
counter.Add(ctx, 100, kv.String("key", "value"))
// Again, now expect cumulative count
mock.Add(time.Minute)
runtime.Gosched()
require.Equal(t, `# HELP a_counter Counts things
# TYPE a_counter counter
a_counter{key="value"} 200
`, scrape())
}

View File

@@ -17,7 +17,6 @@ package stdout_test
import ( import (
"context" "context"
"log" "log"
"time"
"go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
@@ -29,7 +28,7 @@ func ExampleNewExportPipeline() {
pusher, err := stdout.NewExportPipeline(stdout.Config{ pusher, err := stdout.NewExportPipeline(stdout.Config{
PrettyPrint: true, PrettyPrint: true,
DoNotPrintTime: true, DoNotPrintTime: true,
}, time.Minute) })
if err != nil { if err != nil {
log.Fatal("Could not initialize stdout exporter:", err) log.Fatal("Could not initialize stdout exporter:", err)
} }

View File

@@ -29,7 +29,6 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple"
) )
@@ -120,8 +119,8 @@ func NewRawExporter(config Config) (*Exporter, error) {
// } // }
// defer pipeline.Stop() // defer pipeline.Stop()
// ... Done // ... Done
func InstallNewPipeline(config Config, opts ...push.Option) (*push.Controller, error) { func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller, error) {
controller, err := NewExportPipeline(config, time.Minute, opts...) controller, err := NewExportPipeline(config, options...)
if err != nil { if err != nil {
return controller, err return controller, err
} }
@@ -129,16 +128,22 @@ func InstallNewPipeline(config Config, opts ...push.Option) (*push.Controller, e
return controller, err return controller, err
} }
// NewExportPipeline sets up a complete export pipeline with the recommended setup, // NewExportPipeline sets up a complete export pipeline with the
// chaining a NewRawExporter into the recommended selectors and integrators. // recommended setup, chaining a NewRawExporter into the recommended
func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) (*push.Controller, error) { // selectors and integrators.
selector := simple.NewWithExactDistribution() //
// The pipeline is configured with a stateful integrator unless the
// push.WithStateful(false) option is used.
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) {
exporter, err := NewRawExporter(config) exporter, err := NewRawExporter(config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
integrator := integrator.New(selector, true) pusher := push.New(
pusher := push.New(integrator, exporter, period, opts...) simple.NewWithExactDistribution(),
exporter,
append([]push.Option{push.WithStateful(true)}, options...)...,
)
pusher.Start() pusher.Start()
return pusher, nil return pusher, nil

View File

@@ -17,6 +17,7 @@ package test
import ( import (
"context" "context"
"errors" "errors"
"sync"
"go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/label"
@@ -36,9 +37,10 @@ type mapkey struct {
} }
type CheckpointSet struct { type CheckpointSet struct {
sync.RWMutex
records map[mapkey]export.Record records map[mapkey]export.Record
resource *resource.Resource
updates []export.Record updates []export.Record
resource *resource.Resource
} }
// NewCheckpointSet returns a test CheckpointSet that new records could be added. // NewCheckpointSet returns a test CheckpointSet that new records could be added.

View File

@@ -111,7 +111,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
selector := simple.NewWithExactDistribution() selector := simple.NewWithExactDistribution()
integrator := integrator.New(selector, true) integrator := integrator.New(selector, true)
pusher := push.New(integrator, exp, 60*time.Second) pusher := push.New(integrator, exp)
pusher.Start() pusher.Start()
ctx := context.Background() ctx := context.Background()

View File

@@ -16,6 +16,7 @@ package otlp
import ( import (
"context" "context"
"sync"
"testing" "testing"
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1" colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
@@ -60,10 +61,11 @@ func (m *metricsServiceClientStub) Reset() {
} }
type checkpointSet struct { type checkpointSet struct {
sync.RWMutex
records []metricsdk.Record records []metricsdk.Record
} }
func (m checkpointSet) ForEach(fn func(metricsdk.Record) error) error { func (m *checkpointSet) ForEach(fn func(metricsdk.Record) error) error {
for _, r := range m.records { for _, r := range m.records {
if err := fn(r); err != nil && err != aggregator.ErrNoData { if err := fn(r); err != nil && err != aggregator.ErrNoData {
return err return err
@@ -662,7 +664,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg)) recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg))
} }
for _, records := range recs { for _, records := range recs {
assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records})) assert.NoError(t, exp.Export(context.Background(), &checkpointSet{records: records}))
} }
// assert.ElementsMatch does not equate nested slices of different order, // assert.ElementsMatch does not equate nested slices of different order,
@@ -726,7 +728,7 @@ func TestEmptyMetricExport(t *testing.T) {
}, },
} { } {
msc.Reset() msc.Reset()
require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records})) require.NoError(t, exp.Export(context.Background(), &checkpointSet{records: test.records}))
assert.Equal(t, test.want, msc.ResourceMetrics()) assert.Equal(t, test.want, msc.ResourceMetrics())
} }
} }

View File

@@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/export/metric"
import ( import (
"context" "context"
"sync"
"go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
@@ -39,10 +40,6 @@ import (
// single-threaded context from the SDK, after the aggregator is // single-threaded context from the SDK, after the aggregator is
// checkpointed, allowing the integrator to build the set of metrics // checkpointed, allowing the integrator to build the set of metrics
// currently being exported. // currently being exported.
//
// The `CheckpointSet` method is called during collection in a
// single-threaded context from the Exporter, giving the exporter
// access to a producer for iterating over the complete checkpoint.
type Integrator interface { type Integrator interface {
// AggregationSelector is responsible for selecting the // AggregationSelector is responsible for selecting the
// concrete type of Aggregator used for a metric in the SDK. // concrete type of Aggregator used for a metric in the SDK.
@@ -70,17 +67,6 @@ type Integrator interface {
// The Context argument originates from the controller that // The Context argument originates from the controller that
// orchestrates collection. // orchestrates collection.
Process(ctx context.Context, record Record) error Process(ctx context.Context, record Record) error
// CheckpointSet is the interface used by the controller to
// access the fully aggregated checkpoint after collection.
//
// The returned CheckpointSet is passed to the Exporter.
CheckpointSet() CheckpointSet
// FinishedCollection informs the Integrator that a complete
// collection round was completed. Stateless integrators might
// reset state in this method, for example.
FinishedCollection()
} }
// AggregationSelector supports selecting the kind of Aggregator to // AggregationSelector supports selecting the kind of Aggregator to
@@ -173,6 +159,19 @@ type CheckpointSet interface {
// of error will immediately halt ForEach and return // of error will immediately halt ForEach and return
// the error to the caller. // the error to the caller.
ForEach(func(Record) error) error ForEach(func(Record) error) error
// Locker supports locking the checkpoint set. Collection
// into the checkpoint set cannot take place (in case of a
// stateful integrator) while it is locked.
//
// The Integrator attached to the Accumulator MUST be called
// with the lock held.
sync.Locker
// RLock acquires a read lock corresponding to this Locker.
RLock()
// RUnlock releases a read lock corresponding to this Locker.
RUnlock()
} }
// Record contains the exported data for a single metric instrument // Record contains the exported data for a single metric instrument

View File

@@ -15,6 +15,8 @@
package push package push
import ( import (
"time"
sdk "go.opentelemetry.io/otel/sdk/metric" sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
@@ -30,6 +32,14 @@ type Config struct {
// Resource is the OpenTelemetry resource associated with all Meters // Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller. // created by the Controller.
Resource *resource.Resource Resource *resource.Resource
// Stateful causes the controller to maintain state across
// collection events, so that records in the exported
// checkpoint set are cumulative.
Stateful bool
// Period is the interval between calls to Collect a checkpoint.
Period time.Duration
} }
// Option is the interface that applies the value to a configuration option. // Option is the interface that applies the value to a configuration option.
@@ -59,3 +69,25 @@ type resourceOption struct{ *resource.Resource }
func (o resourceOption) Apply(config *Config) { func (o resourceOption) Apply(config *Config) {
config.Resource = o.Resource config.Resource = o.Resource
} }
// WithStateful sets the Stateful configuration option of a Config.
func WithStateful(stateful bool) Option {
return statefulOption(stateful)
}
type statefulOption bool
func (o statefulOption) Apply(config *Config) {
config.Stateful = bool(o)
}
// WithPeriod sets the Period configuration option of a Config.
func WithPeriod(period time.Duration) Option {
return periodOption(period)
}
type periodOption time.Duration
func (o periodOption) Apply(config *Config) {
config.Period = time.Duration(o)
}

View File

@@ -23,77 +23,61 @@ import (
"go.opentelemetry.io/otel/api/metric/registry" "go.opentelemetry.io/otel/api/metric/registry"
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric" sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource" controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
"go.opentelemetry.io/otel/sdk/metric/integrator/simple"
) )
// DefaultPushPeriod is the default time interval between pushes.
const DefaultPushPeriod = 10 * time.Second
// Controller organizes a periodic push of metric data. // Controller organizes a periodic push of metric data.
type Controller struct { type Controller struct {
lock sync.Mutex lock sync.Mutex
collectLock sync.Mutex
accumulator *sdk.Accumulator accumulator *sdk.Accumulator
provider *registry.Provider provider *registry.Provider
errorHandler sdk.ErrorHandler errorHandler sdk.ErrorHandler
integrator export.Integrator integrator *simple.Integrator
exporter export.Exporter exporter export.Exporter
wg sync.WaitGroup wg sync.WaitGroup
ch chan struct{} ch chan struct{}
period time.Duration period time.Duration
ticker Ticker clock controllerTime.Clock
clock Clock ticker controllerTime.Ticker
} }
// Several types below are created to match "github.com/benbjohnson/clock"
// so that it remains a test-only dependency.
type Clock interface {
Now() time.Time
Ticker(time.Duration) Ticker
}
type Ticker interface {
Stop()
C() <-chan time.Time
}
type realClock struct {
}
type realTicker struct {
ticker *time.Ticker
}
var _ Clock = realClock{}
var _ Ticker = realTicker{}
// New constructs a Controller, an implementation of metric.Provider, // New constructs a Controller, an implementation of metric.Provider,
// using the provided integrator, exporter, collection period, and SDK // using the provided exporter and options to configure an SDK with
// configuration options to configure an SDK with periodic collection. // periodic collection.
// The integrator itself is configured with the aggregation selector policy. func New(selector export.AggregationSelector, exporter export.Exporter, opts ...Option) *Controller {
func New(integrator export.Integrator, exporter export.Exporter, period time.Duration, opts ...Option) *Controller {
c := &Config{ c := &Config{
ErrorHandler: sdk.DefaultErrorHandler, ErrorHandler: sdk.DefaultErrorHandler,
Resource: resource.Empty(), Period: DefaultPushPeriod,
} }
for _, opt := range opts { for _, opt := range opts {
opt.Apply(c) opt.Apply(c)
} }
impl := sdk.NewAccumulator(integrator, sdk.WithErrorHandler(c.ErrorHandler), sdk.WithResource(c.Resource)) integrator := simple.New(selector, c.Stateful)
impl := sdk.NewAccumulator(
integrator,
sdk.WithErrorHandler(c.ErrorHandler),
sdk.WithResource(c.Resource),
)
return &Controller{ return &Controller{
accumulator: impl,
provider: registry.NewProvider(impl), provider: registry.NewProvider(impl),
errorHandler: c.ErrorHandler, accumulator: impl,
integrator: integrator, integrator: integrator,
exporter: exporter, exporter: exporter,
errorHandler: c.ErrorHandler,
ch: make(chan struct{}), ch: make(chan struct{}),
period: period, period: c.Period,
clock: realClock{}, clock: controllerTime.RealClock{},
} }
} }
// SetClock supports setting a mock clock for testing. This must be // SetClock supports setting a mock clock for testing. This must be
// called before Start(). // called before Start().
func (c *Controller) SetClock(clock Clock) { func (c *Controller) SetClock(clock controllerTime.Clock) {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
c.clock = clock c.clock = clock
@@ -162,53 +146,15 @@ func (c *Controller) tick() {
// TODO: either remove the context argument from Export() or // TODO: either remove the context argument from Export() or
// configure a timeout here? // configure a timeout here?
ctx := context.Background() ctx := context.Background()
c.collect(ctx) c.integrator.Lock()
checkpointSet := syncCheckpointSet{ defer c.integrator.Unlock()
mtx: &c.collectLock,
delegate: c.integrator.CheckpointSet(), c.accumulator.Collect(ctx)
}
err := c.exporter.Export(ctx, checkpointSet) err := c.exporter.Export(ctx, c.integrator.CheckpointSet())
c.integrator.FinishedCollection() c.integrator.FinishedCollection()
if err != nil { if err != nil {
c.errorHandler(err) c.errorHandler(err)
} }
} }
func (c *Controller) collect(ctx context.Context) {
c.collectLock.Lock()
defer c.collectLock.Unlock()
c.accumulator.Collect(ctx)
}
// syncCheckpointSet is a wrapper for a CheckpointSet to synchronize
// SDK's collection and reads of a CheckpointSet by an exporter.
type syncCheckpointSet struct {
mtx *sync.Mutex
delegate export.CheckpointSet
}
var _ export.CheckpointSet = (*syncCheckpointSet)(nil)
func (c syncCheckpointSet) ForEach(fn func(export.Record) error) error {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.delegate.ForEach(fn)
}
func (realClock) Now() time.Time {
return time.Now()
}
func (realClock) Ticker(period time.Duration) Ticker {
return realTicker{time.NewTicker(period)}
}
func (t realTicker) Stop() {
t.ticker.Stop()
}
func (t realTicker) C() <-chan time.Time {
return t.ticker.C
}

View File

@@ -22,7 +22,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/benbjohnson/clock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/kv"
@@ -33,17 +32,10 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/controller/push"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
type testIntegrator struct {
t *testing.T
lock sync.Mutex
checkpointSet *test.CheckpointSet
checkpoints int
finishes int
}
var testResource = resource.New(kv.String("R", "V")) var testResource = resource.New(kv.String("R", "V"))
type testExporter struct { type testExporter struct {
@@ -56,69 +48,27 @@ type testExporter struct {
type testFixture struct { type testFixture struct {
checkpointSet *test.CheckpointSet checkpointSet *test.CheckpointSet
integrator *testIntegrator
exporter *testExporter exporter *testExporter
} }
type mockClock struct { type testSelector struct{}
mock *clock.Mock
}
type mockTicker struct {
ticker *clock.Ticker
}
var _ push.Clock = mockClock{}
var _ push.Ticker = mockTicker{}
func newFixture(t *testing.T) testFixture { func newFixture(t *testing.T) testFixture {
checkpointSet := test.NewCheckpointSet(testResource) checkpointSet := test.NewCheckpointSet(testResource)
integrator := &testIntegrator{
t: t,
checkpointSet: checkpointSet,
}
exporter := &testExporter{ exporter := &testExporter{
t: t, t: t,
} }
return testFixture{ return testFixture{
checkpointSet: checkpointSet, checkpointSet: checkpointSet,
integrator: integrator,
exporter: exporter, exporter: exporter,
} }
} }
func (b *testIntegrator) AggregatorFor(*metric.Descriptor) export.Aggregator { func (testSelector) AggregatorFor(*metric.Descriptor) export.Aggregator {
return sum.New() return sum.New()
} }
func (b *testIntegrator) CheckpointSet() export.CheckpointSet {
b.lock.Lock()
defer b.lock.Unlock()
b.checkpoints++
return b.checkpointSet
}
func (b *testIntegrator) FinishedCollection() {
b.lock.Lock()
defer b.lock.Unlock()
b.finishes++
}
func (b *testIntegrator) Process(_ context.Context, record export.Record) error {
b.lock.Lock()
defer b.lock.Unlock()
labels := record.Labels().ToSlice()
b.checkpointSet.Add(record.Descriptor(), record.Aggregator(), labels...)
return nil
}
func (b *testIntegrator) getCounts() (checkpoints, finishes int) {
b.lock.Lock()
defer b.lock.Unlock()
return b.checkpoints, b.finishes
}
func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
@@ -147,29 +97,9 @@ func (e *testExporter) resetRecords() ([]export.Record, int) {
return r, e.exports return r, e.exports
} }
func (c mockClock) Now() time.Time {
return c.mock.Now()
}
func (c mockClock) Ticker(period time.Duration) push.Ticker {
return mockTicker{c.mock.Ticker(period)}
}
func (c mockClock) Add(d time.Duration) {
c.mock.Add(d)
}
func (t mockTicker) Stop() {
t.ticker.Stop()
}
func (t mockTicker) C() <-chan time.Time {
return t.ticker.C
}
func TestPushDoubleStop(t *testing.T) { func TestPushDoubleStop(t *testing.T) {
fix := newFixture(t) fix := newFixture(t)
p := push.New(fix.integrator, fix.exporter, time.Second) p := push.New(testSelector{}, fix.exporter)
p.Start() p.Start()
p.Stop() p.Stop()
p.Stop() p.Stop()
@@ -177,7 +107,7 @@ func TestPushDoubleStop(t *testing.T) {
func TestPushDoubleStart(t *testing.T) { func TestPushDoubleStart(t *testing.T) {
fix := newFixture(t) fix := newFixture(t)
p := push.New(fix.integrator, fix.exporter, time.Second) p := push.New(testSelector{}, fix.exporter)
p.Start() p.Start()
p.Start() p.Start()
p.Stop() p.Stop()
@@ -186,10 +116,15 @@ func TestPushDoubleStart(t *testing.T) {
func TestPushTicker(t *testing.T) { func TestPushTicker(t *testing.T) {
fix := newFixture(t) fix := newFixture(t)
p := push.New(fix.integrator, fix.exporter, time.Second) p := push.New(
testSelector{},
fix.exporter,
push.WithPeriod(time.Second),
push.WithResource(testResource),
)
meter := p.Provider().Meter("name") meter := p.Provider().Meter("name")
mock := mockClock{clock.NewMock()} mock := controllerTest.NewMockClock()
p.SetClock(mock) p.SetClock(mock)
ctx := context.Background() ctx := context.Background()
@@ -201,9 +136,6 @@ func TestPushTicker(t *testing.T) {
counter.Add(ctx, 3) counter.Add(ctx, 3)
records, exports := fix.exporter.resetRecords() records, exports := fix.exporter.resetRecords()
checkpoints, finishes := fix.integrator.getCounts()
require.Equal(t, 0, checkpoints)
require.Equal(t, 0, finishes)
require.Equal(t, 0, exports) require.Equal(t, 0, exports)
require.Equal(t, 0, len(records)) require.Equal(t, 0, len(records))
@@ -211,9 +143,6 @@ func TestPushTicker(t *testing.T) {
runtime.Gosched() runtime.Gosched()
records, exports = fix.exporter.resetRecords() records, exports = fix.exporter.resetRecords()
checkpoints, finishes = fix.integrator.getCounts()
require.Equal(t, 1, checkpoints)
require.Equal(t, 1, finishes)
require.Equal(t, 1, exports) require.Equal(t, 1, exports)
require.Equal(t, 1, len(records)) require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name()) require.Equal(t, "counter", records[0].Descriptor().Name())
@@ -231,9 +160,6 @@ func TestPushTicker(t *testing.T) {
runtime.Gosched() runtime.Gosched()
records, exports = fix.exporter.resetRecords() records, exports = fix.exporter.resetRecords()
checkpoints, finishes = fix.integrator.getCounts()
require.Equal(t, 2, checkpoints)
require.Equal(t, 2, finishes)
require.Equal(t, 2, exports) require.Equal(t, 2, exports)
require.Equal(t, 1, len(records)) require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name()) require.Equal(t, "counter", records[0].Descriptor().Name())
@@ -271,7 +197,12 @@ func TestPushExportError(t *testing.T) {
fix := newFixture(t) fix := newFixture(t)
fix.exporter.injectErr = injector("counter1", tt.injectedError) fix.exporter.injectErr = injector("counter1", tt.injectedError)
p := push.New(fix.integrator, fix.exporter, time.Second) p := push.New(
testSelector{},
fix.exporter,
push.WithPeriod(time.Second),
push.WithResource(testResource),
)
var err error var err error
var lock sync.Mutex var lock sync.Mutex
@@ -281,7 +212,7 @@ func TestPushExportError(t *testing.T) {
err = sdkErr err = sdkErr
}) })
mock := mockClock{clock.NewMock()} mock := controllerTest.NewMockClock()
p.SetClock(mock) p.SetClock(mock)
ctx := context.Background() ctx := context.Background()
@@ -303,10 +234,7 @@ func TestPushExportError(t *testing.T) {
runtime.Gosched() runtime.Gosched()
records, exports := fix.exporter.resetRecords() records, exports := fix.exporter.resetRecords()
checkpoints, finishes := fix.integrator.getCounts()
require.Equal(t, 1, exports) require.Equal(t, 1, exports)
require.Equal(t, 1, checkpoints)
require.Equal(t, 1, finishes)
lock.Lock() lock.Lock()
if tt.expectedError == nil { if tt.expectedError == nil {
require.NoError(t, err) require.NoError(t, err)

View File

@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"time"
"github.com/benbjohnson/clock"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
)
type MockClock struct {
mock *clock.Mock
}
type MockTicker struct {
ticker *clock.Ticker
}
var _ controllerTime.Clock = MockClock{}
var _ controllerTime.Ticker = MockTicker{}
func NewMockClock() MockClock {
return MockClock{clock.NewMock()}
}
func (c MockClock) Now() time.Time {
return c.mock.Now()
}
func (c MockClock) Ticker(period time.Duration) controllerTime.Ticker {
return MockTicker{c.mock.Ticker(period)}
}
func (c MockClock) Add(d time.Duration) {
c.mock.Add(d)
}
func (t MockTicker) Stop() {
t.ticker.Stop()
}
func (t MockTicker) C() <-chan time.Time {
return t.ticker.C
}

View File

@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package time // import "go.opentelemetry.io/otel/sdk/metric/controller/time"
import (
"time"
lib "time"
)
// Several types below are created to match "github.com/benbjohnson/clock"
// so that it remains a test-only dependency.
type Clock interface {
Now() lib.Time
Ticker(lib.Duration) Ticker
}
type Ticker interface {
Stop()
C() <-chan lib.Time
}
type RealClock struct {
}
type RealTicker struct {
ticker *lib.Ticker
}
var _ Clock = RealClock{}
var _ Ticker = RealTicker{}
func (RealClock) Now() time.Time {
return time.Now()
}
func (RealClock) Ticker(period time.Duration) Ticker {
return RealTicker{time.NewTicker(period)}
}
func (t RealTicker) Stop() {
t.ticker.Stop()
}
func (t RealTicker) C() <-chan time.Time {
return t.ticker.C
}

View File

@@ -17,7 +17,6 @@ package metric_test
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/kv"
@@ -29,7 +28,7 @@ func ExampleNew() {
pusher, err := stdout.NewExportPipeline(stdout.Config{ pusher, err := stdout.NewExportPipeline(stdout.Config{
PrettyPrint: true, PrettyPrint: true,
DoNotPrintTime: true, // This makes the output deterministic DoNotPrintTime: true, // This makes the output deterministic
}, time.Minute) })
if err != nil { if err != nil {
panic(fmt.Sprintln("Could not initialize stdout exporter:", err)) panic(fmt.Sprintln("Could not initialize stdout exporter:", err))
} }

View File

@@ -17,6 +17,7 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
import ( import (
"context" "context"
"errors" "errors"
"sync"
"go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
@@ -27,9 +28,9 @@ import (
type ( type (
Integrator struct { Integrator struct {
selector export.AggregationSelector export.AggregationSelector
batchMap batchMap
stateful bool stateful bool
batch
} }
batchKey struct { batchKey struct {
@@ -44,24 +45,26 @@ type (
resource *resource.Resource resource *resource.Resource
} }
batchMap map[batchKey]batchValue batch struct {
// RWMutex implements locking for the `CheckpoingSet` interface.
sync.RWMutex
values map[batchKey]batchValue
}
) )
var _ export.Integrator = &Integrator{} var _ export.Integrator = &Integrator{}
var _ export.CheckpointSet = batchMap{} var _ export.CheckpointSet = &batch{}
func New(selector export.AggregationSelector, stateful bool) *Integrator { func New(selector export.AggregationSelector, stateful bool) *Integrator {
return &Integrator{ return &Integrator{
selector: selector, AggregationSelector: selector,
batchMap: batchMap{}, stateful: stateful,
stateful: stateful, batch: batch{
values: map[batchKey]batchValue{},
},
} }
} }
func (b *Integrator) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
return b.selector.AggregatorFor(descriptor)
}
func (b *Integrator) Process(_ context.Context, record export.Record) error { func (b *Integrator) Process(_ context.Context, record export.Record) error {
desc := record.Descriptor() desc := record.Descriptor()
key := batchKey{ key := batchKey{
@@ -70,7 +73,7 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error {
resource: record.Resource().Equivalent(), resource: record.Resource().Equivalent(),
} }
agg := record.Aggregator() agg := record.Aggregator()
value, ok := b.batchMap[key] value, ok := b.batch.values[key]
if ok { if ok {
// Note: The call to Merge here combines only // Note: The call to Merge here combines only
// identical records. It is required even for a // identical records. It is required even for a
@@ -92,7 +95,7 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error {
return err return err
} }
} }
b.batchMap[key] = batchValue{ b.batch.values[key] = batchValue{
aggregator: agg, aggregator: agg,
labels: record.Labels(), labels: record.Labels(),
resource: record.Resource(), resource: record.Resource(),
@@ -101,17 +104,17 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error {
} }
func (b *Integrator) CheckpointSet() export.CheckpointSet { func (b *Integrator) CheckpointSet() export.CheckpointSet {
return b.batchMap return &b.batch
} }
func (b *Integrator) FinishedCollection() { func (b *Integrator) FinishedCollection() {
if !b.stateful { if !b.stateful {
b.batchMap = batchMap{} b.batch.values = map[batchKey]batchValue{}
} }
} }
func (c batchMap) ForEach(f func(export.Record) error) error { func (b *batch) ForEach(f func(export.Record) error) error {
for key, value := range c { for key, value := range b.values {
if err := f(export.NewRecord( if err := f(export.NewRecord(
key.descriptor, key.descriptor,
value.labels, value.labels,

View File

@@ -29,7 +29,7 @@ import (
// These tests use the ../test label encoding. // These tests use the ../test label encoding.
func TestUngroupedStateless(t *testing.T) { func TestSimpleStateless(t *testing.T) {
ctx := context.Background() ctx := context.Background()
b := simple.New(test.NewAggregationSelector(), false) b := simple.New(test.NewAggregationSelector(), false)
@@ -60,7 +60,6 @@ func TestUngroupedStateless(t *testing.T) {
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50)) _ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50))
checkpointSet := b.CheckpointSet() checkpointSet := b.CheckpointSet()
b.FinishedCollection()
records := test.NewOutput(test.SdkEncoder) records := test.NewOutput(test.SdkEncoder)
_ = checkpointSet.ForEach(records.AddTo) _ = checkpointSet.ForEach(records.AddTo)
@@ -81,17 +80,18 @@ func TestUngroupedStateless(t *testing.T) {
"lastvalue.b/C~D&E~F/R~V": 20, // labels2 "lastvalue.b/C~D&E~F/R~V": 20, // labels2
"lastvalue.b//R~V": 30, // labels3 "lastvalue.b//R~V": 30, // labels3
}, records.Map) }, records.Map)
b.FinishedCollection()
// Verify that state was reset // Verify that state was reset
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
b.FinishedCollection()
_ = checkpointSet.ForEach(func(rec export.Record) error { _ = checkpointSet.ForEach(func(rec export.Record) error {
t.Fatal("Unexpected call") t.Fatal("Unexpected call")
return nil return nil
}) })
b.FinishedCollection()
} }
func TestUngroupedStateful(t *testing.T) { func TestSimpleStateful(t *testing.T) {
ctx := context.Background() ctx := context.Background()
b := simple.New(test.NewAggregationSelector(), true) b := simple.New(test.NewAggregationSelector(), true)
@@ -116,12 +116,12 @@ func TestUngroupedStateful(t *testing.T) {
// Test that state was NOT reset // Test that state was NOT reset
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
b.FinishedCollection()
records2 := test.NewOutput(test.SdkEncoder) records2 := test.NewOutput(test.SdkEncoder)
_ = checkpointSet.ForEach(records2.AddTo) _ = checkpointSet.ForEach(records2.AddTo)
require.EqualValues(t, records1.Map, records2.Map) require.EqualValues(t, records1.Map, records2.Map)
b.FinishedCollection()
// Update and re-checkpoint the original record. // Update and re-checkpoint the original record.
_ = caggA.Update(ctx, metric.NewInt64Number(20), &test.CounterADesc) _ = caggA.Update(ctx, metric.NewInt64Number(20), &test.CounterADesc)
@@ -132,19 +132,18 @@ func TestUngroupedStateful(t *testing.T) {
// As yet cagg has not been passed to Integrator.Process. Should // As yet cagg has not been passed to Integrator.Process. Should
// not see an update. // not see an update.
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
b.FinishedCollection()
records3 := test.NewOutput(test.SdkEncoder) records3 := test.NewOutput(test.SdkEncoder)
_ = checkpointSet.ForEach(records3.AddTo) _ = checkpointSet.ForEach(records3.AddTo)
require.EqualValues(t, records1.Map, records3.Map) require.EqualValues(t, records1.Map, records3.Map)
b.FinishedCollection()
// Now process the second update // Now process the second update
_ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA)) _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA))
_ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB)) _ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB))
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
b.FinishedCollection()
records4 := test.NewOutput(test.SdkEncoder) records4 := test.NewOutput(test.SdkEncoder)
_ = checkpointSet.ForEach(records4.AddTo) _ = checkpointSet.ForEach(records4.AddTo)
@@ -153,4 +152,5 @@ func TestUngroupedStateful(t *testing.T) {
"sum.a/C~D&G~H/R~V": 30, "sum.a/C~D&G~H/R~V": 30,
"sum.b/C~D&G~H/R~V": 30, "sum.b/C~D&G~H/R~V": 30,
}, records4.Map) }, records4.Map)
b.FinishedCollection()
} }

View File

@@ -370,6 +370,7 @@ func (m *Accumulator) Collect(ctx context.Context) int {
checkpointed := m.collectSyncInstruments(ctx) checkpointed := m.collectSyncInstruments(ctx)
checkpointed += m.observeAsyncInstruments(ctx) checkpointed += m.observeAsyncInstruments(ctx)
m.currentEpoch++ m.currentEpoch++
return checkpointed return checkpointed
} }