1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-07-15 01:04:25 +02:00

Separate InstrumentationLibrary from metric.Descriptor (#2197)

* factor instrumentation library out of the instrument descriptor

* SDK tests pass

* checkpoint work

* otlp and opencensus tests passing

* prometheus

* tests pass, working on lint

* lint applied: MetricReader->Reader

* comments

* Changelog

* Apply suggestions from code review

Co-authored-by: alrex <alrex.boten@gmail.com>

* remove an interdependency

* fix build

* re-indent one

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Lint&feedback

* update after rename

* comment fix

* style fix for meter options

* remove libraryReader, let Controller implement the reader API directly

* rename 'impl' field to 'provider'

* remove a type assertion

* move metric/registry into internal; move registry.MeterProvider into metric controller

* add test for controller registry function

* CheckpointSet->Reader everywhere

* lint

* remove two unnecessary accessor methods; Controller implements MeterProvider and InstrumentationLibraryReader directly, no need to get these

* use a sync.Map

* ensure the initOnce is always called; handle multiple errors

* Apply suggestions from code review

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

* cleanup locking in metrictest

* Revert "ensure the initOnce is always called; handle multiple errors"

This reverts commit 3356eb5ed0.

* Revert "use a sync.Map"

This reverts commit ea7bc599bd.

* restore the TODO about sync.Map

Co-authored-by: alrex <alrex.boten@gmail.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
This commit is contained in:
Joshua MacDonald
2021-09-27 08:51:47 -07:00
committed by GitHub
parent 92551d3933
commit 3c8e1853f0
54 changed files with 1232 additions and 979 deletions

View File

@ -21,9 +21,10 @@ import (
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/metric/registry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/registry"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
"go.opentelemetry.io/otel/sdk/resource"
@ -52,19 +53,25 @@ var ErrControllerStarted = fmt.Errorf("controller already started")
// collection
//
// The controller supports mixing push and pull access to metric data
// using the export.CheckpointSet RWLock interface. Collection will
// using the export.Reader RWLock interface. Collection will
// be blocked by a pull request in the basic controller.
type Controller struct {
lock sync.Mutex
accumulator *sdk.Accumulator
provider *registry.MeterProvider
checkpointer export.Checkpointer
resource *resource.Resource
exporter export.Exporter
wg sync.WaitGroup
stopCh chan struct{}
clock controllerTime.Clock
ticker controllerTime.Ticker
// lock protects libraries and synchronizes Start() and Stop().
lock sync.Mutex
// TODO: libraries is synchronized by lock, but could be
// accomplished using a sync.Map. The SDK specification will
// probably require this, as the draft already states that
// Stop() and MeterProvider.Meter() should not block each
// other.
libraries map[instrumentation.Library]*registry.UniqueInstrumentMeterImpl
checkpointerFactory export.CheckpointerFactory
resource *resource.Resource
exporter export.Exporter
wg sync.WaitGroup
stopCh chan struct{}
clock controllerTime.Clock
ticker controllerTime.Ticker
collectPeriod time.Duration
collectTimeout time.Duration
@ -75,10 +82,44 @@ type Controller struct {
collectedTime time.Time
}
// New constructs a Controller using the provided checkpointer and
// options (including optional exporter) to configure a metric
var _ export.InstrumentationLibraryReader = &Controller{}
var _ metric.MeterProvider = &Controller{}
func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOption) metric.Meter {
cfg := metric.NewMeterConfig(opts...)
library := instrumentation.Library{
Name: instrumentationName,
Version: cfg.InstrumentationVersion(),
SchemaURL: cfg.SchemaURL(),
}
c.lock.Lock()
defer c.lock.Unlock()
m, ok := c.libraries[library]
if !ok {
checkpointer := c.checkpointerFactory.NewCheckpointer()
accumulator := sdk.NewAccumulator(checkpointer)
m = registry.NewUniqueInstrumentMeterImpl(&accumulatorCheckpointer{
Accumulator: accumulator,
checkpointer: checkpointer,
library: library,
})
c.libraries[library] = m
}
return metric.WrapMeterImpl(m)
}
type accumulatorCheckpointer struct {
*sdk.Accumulator
checkpointer export.Checkpointer
library instrumentation.Library
}
// New constructs a Controller using the provided checkpointer factory
// and options (including optional exporter) to configure a metric
// export pipeline.
func New(checkpointer export.Checkpointer, opts ...Option) *Controller {
func New(checkpointerFactory export.CheckpointerFactory, opts ...Option) *Controller {
c := &config{
CollectPeriod: DefaultPeriod,
CollectTimeout: DefaultPeriod,
@ -96,15 +137,13 @@ func New(checkpointer export.Checkpointer, opts ...Option) *Controller {
otel.Handle(err)
}
}
impl := sdk.NewAccumulator(checkpointer)
return &Controller{
provider: registry.NewMeterProvider(impl),
accumulator: impl,
checkpointer: checkpointer,
resource: c.Resource,
exporter: c.Exporter,
stopCh: nil,
clock: controllerTime.RealClock{},
libraries: map[instrumentation.Library]*registry.UniqueInstrumentMeterImpl{},
checkpointerFactory: checkpointerFactory,
exporter: c.Exporter,
resource: c.Resource,
stopCh: nil,
clock: controllerTime.RealClock{},
collectPeriod: c.CollectPeriod,
collectTimeout: c.CollectTimeout,
@ -120,11 +159,6 @@ func (c *Controller) SetClock(clock controllerTime.Clock) {
c.clock = clock
}
// MeterProvider returns a MeterProvider instance for this controller.
func (c *Controller) MeterProvider() metric.MeterProvider {
return c.provider
}
// Resource returns the *resource.Resource associated with this
// controller.
func (c *Controller) Resource() *resource.Resource {
@ -165,19 +199,23 @@ func (c *Controller) Start(ctx context.Context) error {
//
// Note that Stop() will not cancel an ongoing collection or export.
func (c *Controller) Stop(ctx context.Context) error {
c.lock.Lock()
defer c.lock.Unlock()
if lastCollection := func() bool {
c.lock.Lock()
defer c.lock.Unlock()
if c.stopCh == nil {
if c.stopCh == nil {
return false
}
close(c.stopCh)
c.stopCh = nil
c.wg.Wait()
c.ticker.Stop()
c.ticker = nil
return true
}(); !lastCollection {
return nil
}
close(c.stopCh)
c.stopCh = nil
c.wg.Wait()
c.ticker.Stop()
c.ticker = nil
return c.collect(ctx)
}
@ -198,9 +236,7 @@ func (c *Controller) runTicker(ctx context.Context, stopCh chan struct{}) {
// collect computes a checkpoint and optionally exports it.
func (c *Controller) collect(ctx context.Context) error {
if err := c.checkpoint(ctx, func() bool {
return true
}); err != nil {
if err := c.checkpoint(ctx); err != nil {
return err
}
if c.exporter == nil {
@ -212,19 +248,45 @@ func (c *Controller) collect(ctx context.Context) error {
return c.export(ctx)
}
// accumulatorList returns a snapshot of current accumulators
// registered to this controller. This briefly locks the controller.
func (c *Controller) accumulatorList() []*accumulatorCheckpointer {
c.lock.Lock()
defer c.lock.Unlock()
var r []*accumulatorCheckpointer
for _, entry := range c.libraries {
acc, ok := entry.MeterImpl().(*accumulatorCheckpointer)
if ok {
r = append(r, acc)
}
}
return r
}
// checkpoint calls the Accumulator and Checkpointer interfaces to
// compute the CheckpointSet. This applies the configured collection
// compute the Reader. This applies the configured collection
// timeout. Note that this does not try to cancel a Collect or Export
// when Stop() is called.
func (c *Controller) checkpoint(ctx context.Context, cond func() bool) error {
ckpt := c.checkpointer.CheckpointSet()
func (c *Controller) checkpoint(ctx context.Context) error {
for _, impl := range c.accumulatorList() {
if err := c.checkpointSingleAccumulator(ctx, impl); err != nil {
return err
}
}
return nil
}
// checkpointSingleAccumulator checkpoints a single instrumentation
// library's accumulator, which involves calling
// checkpointer.StartCollection, accumulator.Collect, and
// checkpointer.FinishCollection in sequence.
func (c *Controller) checkpointSingleAccumulator(ctx context.Context, ac *accumulatorCheckpointer) error {
ckpt := ac.checkpointer.Reader()
ckpt.Lock()
defer ckpt.Unlock()
if !cond() {
return nil
}
c.checkpointer.StartCollection()
ac.checkpointer.StartCollection()
if c.collectTimeout > 0 {
var cancel context.CancelFunc
@ -232,7 +294,7 @@ func (c *Controller) checkpoint(ctx context.Context, cond func() bool) error {
defer cancel()
}
_ = c.accumulator.Collect(ctx)
_ = ac.Accumulator.Collect(ctx)
var err error
select {
@ -243,7 +305,7 @@ func (c *Controller) checkpoint(ctx context.Context, cond func() bool) error {
}
// Finish the checkpoint whether the accumulator timed out or not.
if cerr := c.checkpointer.FinishCollection(); cerr != nil {
if cerr := ac.checkpointer.FinishCollection(); cerr != nil {
if err == nil {
err = cerr
} else {
@ -254,34 +316,36 @@ func (c *Controller) checkpoint(ctx context.Context, cond func() bool) error {
return err
}
// export calls the exporter with a read lock on the CheckpointSet,
// export calls the exporter with a read lock on the Reader,
// applying the configured export timeout.
func (c *Controller) export(ctx context.Context) error {
ckpt := c.checkpointer.CheckpointSet()
ckpt.RLock()
defer ckpt.RUnlock()
if c.pushTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.pushTimeout)
defer cancel()
}
return c.exporter.Export(ctx, c.resource, ckpt)
return c.exporter.Export(ctx, c.resource, c)
}
// ForEach gives the caller read-locked access to the current
// export.CheckpointSet.
func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) error) error {
ckpt := c.checkpointer.CheckpointSet()
ckpt.RLock()
defer ckpt.RUnlock()
return ckpt.ForEach(ks, f)
// ForEach implements export.InstrumentationLibraryReader.
func (c *Controller) ForEach(readerFunc func(l instrumentation.Library, r export.Reader) error) error {
for _, acPair := range c.accumulatorList() {
reader := acPair.checkpointer.Reader()
// TODO: We should not fail fast; instead accumulate errors.
if err := func() error {
reader.RLock()
defer reader.RUnlock()
return readerFunc(acPair.library, reader)
}(); err != nil {
return err
}
}
return nil
}
// IsRunning returns true if the controller was started via Start(),
// indicating that the current export.CheckpointSet is being kept
// indicating that the current export.Reader is being kept
// up-to-date.
func (c *Controller) IsRunning() bool {
c.lock.Lock()
@ -298,16 +362,20 @@ func (c *Controller) Collect(ctx context.Context) error {
// computing checkpoints with the collection period.
return ErrControllerStarted
}
if !c.shouldCollect() {
return nil
}
return c.checkpoint(ctx, c.shouldCollect)
return c.checkpoint(ctx)
}
// shouldCollect returns true if the collector should collect now,
// based on the timestamp, the last collection time, and the
// configured period.
func (c *Controller) shouldCollect() bool {
// This is called with the CheckpointSet exclusive
// lock held.
c.lock.Lock()
defer c.lock.Unlock()
if c.collectPeriod == 0 {
return true
}