1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-12-01 23:12:29 +02:00

Refactor Pipeline (#3233)

* Add views field to pipeline

Redundant maps tracking readers to views and readers to pipelines exist
in the pipelineRegistry. Unify these maps by tracing views in pipelines.

* Rename newPipelineRegistries->newPipelineRegistry

* Add Reader as field to pipeline

* Replace createAggregators with resolver facilitator

* Replace create agg funcs with inserter facilitator

* Correct documentation

* Replace pipelineRegistry with []pipeline type

* Rename newPipelineRegistry->newPipelines

* Fix pipeline_registry_test

* Flatten isMonotonic into only use

* Update FIXME into TODO

* Rename instrument provider resolver field to resolve

* Fix comment English

* Fix drop aggregator detection
This commit is contained in:
Tyler Yahn
2022-09-28 08:47:20 -07:00
committed by GitHub
parent 12e16d41e7
commit aca054b075
6 changed files with 264 additions and 215 deletions

View File

@@ -30,9 +30,25 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
)
var (
errCreatingAggregators = errors.New("could not create all aggregators")
errIncompatibleAggregation = errors.New("incompatible aggregation")
errUnknownAggregation = errors.New("unrecognized aggregation")
)
type aggregator interface {
Aggregation() metricdata.Aggregation
}
// instrumentID is used to identify multiple instruments being mapped to the
// same aggregator. e.g. using an exact match view with a name=* view. You
// can't use a view.Instrument here because not all Aggregators are comparable.
type instrumentID struct {
scope instrumentation.Scope
name string
description string
}
type instrumentKey struct {
name string
unit unit.Unit
@@ -43,12 +59,14 @@ type instrumentValue struct {
aggregator aggregator
}
func newPipeline(res *resource.Resource) *pipeline {
func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline {
if res == nil {
res = resource.Empty()
}
return &pipeline{
resource: res,
reader: reader,
views: views,
aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue),
}
}
@@ -61,6 +79,9 @@ func newPipeline(res *resource.Resource) *pipeline {
type pipeline struct {
resource *resource.Resource
reader Reader
views []view.View
sync.Mutex
aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue
callbacks []func(context.Context)
@@ -155,53 +176,198 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
}, nil
}
// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve
// new Aggregators from a pipelineRegistry.
type pipelineRegistry struct {
views map[Reader][]view.View
pipelines map[Reader]*pipeline
// inserter facilitates inserting of new instruments into a pipeline.
type inserter[N int64 | float64] struct {
pipeline *pipeline
}
func newPipelineRegistries(res *resource.Resource, views map[Reader][]view.View) *pipelineRegistry {
pipelines := map[Reader]*pipeline{}
for rdr := range views {
pipe := &pipeline{resource: res}
rdr.register(pipe)
pipelines[rdr] = pipe
func newInserter[N int64 | float64](p *pipeline) *inserter[N] {
return &inserter[N]{p}
}
// Instrument inserts instrument inst with instUnit returning the Aggregators
// that need to be updated with measurments for that instrument.
func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
seen := map[instrumentID]struct{}{}
var aggs []internal.Aggregator[N]
errs := &multierror{wrapped: errCreatingAggregators}
for _, v := range i.pipeline.views {
inst, match := v.TransformInstrument(inst)
id := instrumentID{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}
if _, ok := seen[id]; ok || !match {
continue
}
if inst.Aggregation == nil {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
} else if _, ok := inst.Aggregation.(aggregation.Default); ok {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}
if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err)
errs.append(err)
continue
}
agg, err := i.aggregator(inst)
if err != nil {
errs.append(err)
continue
}
if agg == nil { // Drop aggregator.
continue
}
// TODO (#3011): If filtering is done at the instrument level add here.
// This is where the aggregator and the view are both in scope.
aggs = append(aggs, agg)
seen[id] = struct{}{}
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg)
if err != nil {
errs.append(err)
}
}
return &pipelineRegistry{
views: views,
pipelines: pipelines,
// TODO(#3224): handle when no views match. Default should be reader
// aggregation returned.
return aggs, errs.errorOrNil()
}
// aggregator returns the Aggregator for an instrument configuration. If the
// instrument defines an unknown aggregation, an error is returned.
func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) {
// TODO (#3011): If filtering is done by the Aggregator it should be passed
// here.
var (
temporality = i.pipeline.reader.temporality(inst.Kind)
monotonic bool
)
switch inst.Kind {
case view.AsyncCounter, view.SyncCounter, view.SyncHistogram:
monotonic = true
}
switch agg := inst.Aggregation.(type) {
case aggregation.Drop:
return nil, nil
case aggregation.LastValue:
return internal.NewLastValue[N](), nil
case aggregation.Sum:
if temporality == metricdata.CumulativeTemporality {
return internal.NewCumulativeSum[N](monotonic), nil
}
return internal.NewDeltaSum[N](monotonic), nil
case aggregation.ExplicitBucketHistogram:
if temporality == metricdata.CumulativeTemporality {
return internal.NewCumulativeHistogram[N](agg), nil
}
return internal.NewDeltaHistogram[N](agg), nil
}
return nil, errUnknownAggregation
}
// isAggregatorCompatible checks if the aggregation can be used by the instrument.
// Current compatibility:
//
// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
// |----------------------|------|-----------|-----|-----------|-----------------------|
// | Sync Counter | X | | X | X | X |
// | Sync UpDown Counter | X | | X | | |
// | Sync Histogram | X | | X | X | X |
// | Async Counter | X | | X | | |
// | Async UpDown Counter | X | | X | | |
// | Async Gauge | X | X | | | |.
func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error {
switch agg.(type) {
case aggregation.ExplicitBucketHistogram:
if kind == view.SyncCounter || kind == view.SyncHistogram {
return nil
}
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
return errIncompatibleAggregation
case aggregation.Sum:
switch kind {
case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter:
return nil
default:
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
return errIncompatibleAggregation
}
case aggregation.LastValue:
if kind == view.AsyncGauge {
return nil
}
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
return errIncompatibleAggregation
case aggregation.Drop:
return nil
default:
// This is used passed checking for default, it should be an error at this point.
return fmt.Errorf("%w: %v", errUnknownAggregation, agg)
}
}
// pipelines is the group of pipelines connecting Readers with instrument
// measurement.
type pipelines []*pipeline
func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for r, v := range readers {
p := &pipeline{
resource: res,
reader: r,
views: v,
}
r.register(p)
pipes = append(pipes, p)
}
return pipes
}
// TODO (#3053) Only register callbacks if any instrument matches in a view.
func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) {
for _, pipe := range reg.pipelines {
func (p pipelines) registerCallback(fn func(context.Context)) {
for _, pipe := range p {
pipe.addCallback(fn)
}
}
// createAggregators will create all backing aggregators for an instrument.
// It will return an error if an instrument is registered more than once.
// Note: There may be returned aggregators with an error.
func createAggregators[N int64 | float64](reg *pipelineRegistry, inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
// resolver facilitates resolving Aggregators an instrument needs to aggregate
// measurements with while updating all pipelines that need to pull from those
// aggregations.
type resolver[N int64 | float64] struct {
inserters []*inserter[N]
}
func newResolver[N int64 | float64](p pipelines) *resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter[N](p[i])
}
return &resolver[N]{in}
}
// Aggregators returns the Aggregators instrument inst needs to update when it
// makes a measurement.
func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]
errs := &multierror{}
for rdr, views := range reg.views {
pipe := reg.pipelines[rdr]
rdrAggs, err := createAggregatorsForReader[N](rdr, views, inst)
for _, i := range r.inserters {
a, err := i.Instrument(inst, instUnit)
if err != nil {
errs.append(err)
}
for inst, agg := range rdrAggs {
err := pipe.addAggregator(inst.scope, inst.name, inst.description, instUnit, agg)
if err != nil {
errs.append(err)
}
aggs = append(aggs, agg)
}
aggs = append(aggs, a...)
}
return aggs, errs.errorOrNil()
}
@@ -221,126 +387,3 @@ func (m *multierror) errorOrNil() error {
func (m *multierror) append(err error) {
m.errors = append(m.errors, err.Error())
}
// instrumentID is used to identify multiple instruments being mapped to the same aggregator.
// e.g. using an exact match view with a name=* view.
// You can't use a view.Instrument here because not all Aggregators are comparable.
type instrumentID struct {
scope instrumentation.Scope
name string
description string
}
var errCreatingAggregators = errors.New("could not create all aggregators")
func createAggregatorsForReader[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentID]internal.Aggregator[N], error) {
aggs := map[instrumentID]internal.Aggregator[N]{}
errs := &multierror{
wrapped: errCreatingAggregators,
}
for _, v := range views {
inst, match := v.TransformInstrument(inst)
ident := instrumentID{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}
if _, ok := aggs[ident]; ok || !match {
continue
}
if inst.Aggregation == nil {
inst.Aggregation = rdr.aggregation(inst.Kind)
} else if _, ok := inst.Aggregation.(aggregation.Default); ok {
inst.Aggregation = rdr.aggregation(inst.Kind)
}
if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err)
errs.append(err)
continue
}
agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind), isMonotonic(inst.Kind))
if agg != nil {
// TODO (#3011): If filtering is done at the instrument level add here.
// This is where the aggregator and the view are both in scope.
aggs[ident] = agg
}
}
return aggs, errs.errorOrNil()
}
func isMonotonic(kind view.InstrumentKind) bool {
switch kind {
case view.AsyncCounter, view.SyncCounter, view.SyncHistogram:
return true
}
return false
}
// createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator.
// TODO (#3011): If filterting is done by the Aggregator it should be passed here.
func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) internal.Aggregator[N] {
switch agg := agg.(type) {
case aggregation.Drop:
return nil
case aggregation.LastValue:
return internal.NewLastValue[N]()
case aggregation.Sum:
if temporality == metricdata.CumulativeTemporality {
return internal.NewCumulativeSum[N](monotonic)
}
return internal.NewDeltaSum[N](monotonic)
case aggregation.ExplicitBucketHistogram:
if temporality == metricdata.CumulativeTemporality {
return internal.NewCumulativeHistogram[N](agg)
}
return internal.NewDeltaHistogram[N](agg)
}
return nil
}
// TODO: review need for aggregation check after https://github.com/open-telemetry/opentelemetry-specification/issues/2710
var errIncompatibleAggregation = errors.New("incompatible aggregation")
var errUnknownAggregation = errors.New("unrecognized aggregation")
// is aggregatorCompatible checks if the aggregation can be used by the instrument.
// Current compatibility:
//
// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
// |----------------------|------|-----------|-----|-----------|-----------------------|
// | Sync Counter | X | | X | X | X |
// | Sync UpDown Counter | X | | X | | |
// | Sync Histogram | X | | X | X | X |
// | Async Counter | X | | X | | |
// | Async UpDown Counter | X | | X | | |
// | Async Gauge | X | X | | | |.
func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error {
switch agg.(type) {
case aggregation.ExplicitBucketHistogram:
if kind == view.SyncCounter || kind == view.SyncHistogram {
return nil
}
return errIncompatibleAggregation
case aggregation.Sum:
switch kind {
case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter:
return nil
default:
return errIncompatibleAggregation
}
case aggregation.LastValue:
if kind == view.AsyncGauge {
return nil
}
return errIncompatibleAggregation
case aggregation.Drop:
return nil
default:
// This is used passed checking for default, it should be an error at this point.
return fmt.Errorf("%w: %v", errUnknownAggregation, agg)
}
}