mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-12 10:04:29 +02:00
c5ebbc4d4b
* Use default view if inst matches no other
Fix #3224
* Test default view applied if no match
* Add changes to changelog
* Remove unneeded views len check in WithReader
* Do not return agg if adding err-ed
* Revert "Do not return agg if adding err-ed"
This reverts commit b56efb06a7
.
406 lines
12 KiB
Go
406 lines
12 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"go.opentelemetry.io/otel/metric/unit"
|
|
"go.opentelemetry.io/otel/sdk/instrumentation"
|
|
"go.opentelemetry.io/otel/sdk/metric/aggregation"
|
|
"go.opentelemetry.io/otel/sdk/metric/internal"
|
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
|
"go.opentelemetry.io/otel/sdk/metric/view"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
)
|
|
|
|
var (
|
|
errCreatingAggregators = errors.New("could not create all aggregators")
|
|
errIncompatibleAggregation = errors.New("incompatible aggregation")
|
|
errUnknownAggregation = errors.New("unrecognized aggregation")
|
|
)
|
|
|
|
type aggregator interface {
|
|
Aggregation() metricdata.Aggregation
|
|
}
|
|
|
|
// instrumentID is used to identify multiple instruments being mapped to the
|
|
// same aggregator. e.g. using an exact match view with a name=* view. You
|
|
// can't use a view.Instrument here because not all Aggregators are comparable.
|
|
type instrumentID struct {
|
|
scope instrumentation.Scope
|
|
name string
|
|
description string
|
|
}
|
|
|
|
type instrumentKey struct {
|
|
name string
|
|
unit unit.Unit
|
|
}
|
|
|
|
type instrumentValue struct {
|
|
description string
|
|
aggregator aggregator
|
|
}
|
|
|
|
func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipeline {
|
|
if res == nil {
|
|
res = resource.Empty()
|
|
}
|
|
return &pipeline{
|
|
resource: res,
|
|
reader: reader,
|
|
views: views,
|
|
aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue),
|
|
}
|
|
}
|
|
|
|
// pipeline connects all of the instruments created by a meter provider to a Reader.
|
|
// This is the object that will be `Reader.register()` when a meter provider is created.
|
|
//
|
|
// As instruments are created the instrument should be checked if it exists in the
|
|
// views of a the Reader, and if so each aggregator should be added to the pipeline.
|
|
type pipeline struct {
|
|
resource *resource.Resource
|
|
|
|
reader Reader
|
|
views []view.View
|
|
|
|
sync.Mutex
|
|
aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue
|
|
callbacks []func(context.Context)
|
|
}
|
|
|
|
var errAlreadyRegistered = errors.New("instrument already registered")
|
|
|
|
// addAggregator will stores an aggregator with an instrument description. The aggregator
|
|
// is used when `produce()` is called.
|
|
func (p *pipeline) addAggregator(scope instrumentation.Scope, name, description string, instUnit unit.Unit, agg aggregator) error {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
if p.aggregations == nil {
|
|
p.aggregations = map[instrumentation.Scope]map[instrumentKey]instrumentValue{}
|
|
}
|
|
if p.aggregations[scope] == nil {
|
|
p.aggregations[scope] = map[instrumentKey]instrumentValue{}
|
|
}
|
|
inst := instrumentKey{
|
|
name: name,
|
|
unit: instUnit,
|
|
}
|
|
if _, ok := p.aggregations[scope][inst]; ok {
|
|
return fmt.Errorf("%w: name %s, scope: %s", errAlreadyRegistered, name, scope)
|
|
}
|
|
|
|
p.aggregations[scope][inst] = instrumentValue{
|
|
description: description,
|
|
aggregator: agg,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// addCallback registers a callback to be run when `produce()` is called.
|
|
func (p *pipeline) addCallback(callback func(context.Context)) {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
p.callbacks = append(p.callbacks, callback)
|
|
}
|
|
|
|
// callbackKey is a context key type used to identify context that came from the SDK.
|
|
type callbackKey int
|
|
|
|
// produceKey is the context key to tell if a Observe is called within a callback.
|
|
// Its value of zero is arbitrary. If this package defined other context keys,
|
|
// they would have different integer values.
|
|
const produceKey callbackKey = 0
|
|
|
|
// produce returns aggregated metrics from a single collection.
|
|
//
|
|
// This method is safe to call concurrently.
|
|
func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
|
|
ctx = context.WithValue(ctx, produceKey, struct{}{})
|
|
|
|
for _, callback := range p.callbacks {
|
|
// TODO make the callbacks parallel. ( #3034 )
|
|
callback(ctx)
|
|
if err := ctx.Err(); err != nil {
|
|
// This means the context expired before we finished running callbacks.
|
|
return metricdata.ResourceMetrics{}, err
|
|
}
|
|
}
|
|
|
|
sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
|
|
for scope, instruments := range p.aggregations {
|
|
metrics := make([]metricdata.Metrics, 0, len(instruments))
|
|
for inst, instValue := range instruments {
|
|
data := instValue.aggregator.Aggregation()
|
|
if data != nil {
|
|
metrics = append(metrics, metricdata.Metrics{
|
|
Name: inst.name,
|
|
Description: instValue.description,
|
|
Unit: inst.unit,
|
|
Data: data,
|
|
})
|
|
}
|
|
}
|
|
if len(metrics) > 0 {
|
|
sm = append(sm, metricdata.ScopeMetrics{
|
|
Scope: scope,
|
|
Metrics: metrics,
|
|
})
|
|
}
|
|
}
|
|
|
|
return metricdata.ResourceMetrics{
|
|
Resource: p.resource,
|
|
ScopeMetrics: sm,
|
|
}, nil
|
|
}
|
|
|
|
// inserter facilitates inserting of new instruments into a pipeline.
|
|
type inserter[N int64 | float64] struct {
|
|
pipeline *pipeline
|
|
}
|
|
|
|
func newInserter[N int64 | float64](p *pipeline) *inserter[N] {
|
|
return &inserter[N]{p}
|
|
}
|
|
|
|
// Instrument inserts instrument inst with instUnit returning the Aggregators
|
|
// that need to be updated with measurments for that instrument.
|
|
func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
|
|
var matched bool
|
|
seen := map[instrumentID]struct{}{}
|
|
var aggs []internal.Aggregator[N]
|
|
errs := &multierror{wrapped: errCreatingAggregators}
|
|
for _, v := range i.pipeline.views {
|
|
inst, match := v.TransformInstrument(inst)
|
|
if !match {
|
|
continue
|
|
}
|
|
matched = true
|
|
|
|
id := instrumentID{
|
|
scope: inst.Scope,
|
|
name: inst.Name,
|
|
description: inst.Description,
|
|
}
|
|
if _, ok := seen[id]; ok {
|
|
continue
|
|
}
|
|
|
|
agg, err := i.aggregator(inst)
|
|
if err != nil {
|
|
errs.append(err)
|
|
continue
|
|
}
|
|
if agg == nil { // Drop aggregator.
|
|
continue
|
|
}
|
|
// TODO (#3011): If filtering is done at the instrument level add here.
|
|
// This is where the aggregator and the view are both in scope.
|
|
aggs = append(aggs, agg)
|
|
seen[id] = struct{}{}
|
|
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg)
|
|
if err != nil {
|
|
errs.append(err)
|
|
}
|
|
}
|
|
|
|
if !matched { // Apply implicit default view if no explicit matched.
|
|
a, err := i.aggregator(inst)
|
|
if err != nil {
|
|
errs.append(err)
|
|
}
|
|
if a != nil {
|
|
aggs = append(aggs, a)
|
|
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a)
|
|
if err != nil {
|
|
errs.append(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return aggs, errs.errorOrNil()
|
|
}
|
|
|
|
// aggregator returns the Aggregator for an instrument configuration. If the
|
|
// instrument defines an unknown aggregation, an error is returned.
|
|
func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) {
|
|
switch inst.Aggregation.(type) {
|
|
case nil, aggregation.Default:
|
|
// Undefined, nil, means to use the default from the reader.
|
|
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
|
|
}
|
|
|
|
if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
|
|
return nil, fmt.Errorf(
|
|
"creating aggregator with instrumentKind: %d, aggregation %v: %w",
|
|
inst.Kind, inst.Aggregation, err,
|
|
)
|
|
}
|
|
|
|
var (
|
|
temporality = i.pipeline.reader.temporality(inst.Kind)
|
|
monotonic bool
|
|
)
|
|
|
|
switch inst.Kind {
|
|
case view.AsyncCounter, view.SyncCounter, view.SyncHistogram:
|
|
monotonic = true
|
|
}
|
|
|
|
switch agg := inst.Aggregation.(type) {
|
|
case aggregation.Drop:
|
|
return nil, nil
|
|
case aggregation.LastValue:
|
|
return internal.NewLastValue[N](), nil
|
|
case aggregation.Sum:
|
|
if temporality == metricdata.CumulativeTemporality {
|
|
return internal.NewCumulativeSum[N](monotonic), nil
|
|
}
|
|
return internal.NewDeltaSum[N](monotonic), nil
|
|
case aggregation.ExplicitBucketHistogram:
|
|
if temporality == metricdata.CumulativeTemporality {
|
|
return internal.NewCumulativeHistogram[N](agg), nil
|
|
}
|
|
return internal.NewDeltaHistogram[N](agg), nil
|
|
}
|
|
return nil, errUnknownAggregation
|
|
}
|
|
|
|
// isAggregatorCompatible checks if the aggregation can be used by the instrument.
|
|
// Current compatibility:
|
|
//
|
|
// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
|
|
// |----------------------|------|-----------|-----|-----------|-----------------------|
|
|
// | Sync Counter | X | | X | X | X |
|
|
// | Sync UpDown Counter | X | | X | | |
|
|
// | Sync Histogram | X | | X | X | X |
|
|
// | Async Counter | X | | X | | |
|
|
// | Async UpDown Counter | X | | X | | |
|
|
// | Async Gauge | X | X | | | |.
|
|
func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error {
|
|
switch agg.(type) {
|
|
case aggregation.ExplicitBucketHistogram:
|
|
if kind == view.SyncCounter || kind == view.SyncHistogram {
|
|
return nil
|
|
}
|
|
// TODO: review need for aggregation check after
|
|
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
|
return errIncompatibleAggregation
|
|
case aggregation.Sum:
|
|
switch kind {
|
|
case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter:
|
|
return nil
|
|
default:
|
|
// TODO: review need for aggregation check after
|
|
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
|
return errIncompatibleAggregation
|
|
}
|
|
case aggregation.LastValue:
|
|
if kind == view.AsyncGauge {
|
|
return nil
|
|
}
|
|
// TODO: review need for aggregation check after
|
|
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
|
return errIncompatibleAggregation
|
|
case aggregation.Drop:
|
|
return nil
|
|
default:
|
|
// This is used passed checking for default, it should be an error at this point.
|
|
return fmt.Errorf("%w: %v", errUnknownAggregation, agg)
|
|
}
|
|
}
|
|
|
|
// pipelines is the group of pipelines connecting Readers with instrument
|
|
// measurement.
|
|
type pipelines []*pipeline
|
|
|
|
func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines {
|
|
pipes := make([]*pipeline, 0, len(readers))
|
|
for r, v := range readers {
|
|
p := &pipeline{
|
|
resource: res,
|
|
reader: r,
|
|
views: v,
|
|
}
|
|
r.register(p)
|
|
pipes = append(pipes, p)
|
|
}
|
|
return pipes
|
|
}
|
|
|
|
// TODO (#3053) Only register callbacks if any instrument matches in a view.
|
|
func (p pipelines) registerCallback(fn func(context.Context)) {
|
|
for _, pipe := range p {
|
|
pipe.addCallback(fn)
|
|
}
|
|
}
|
|
|
|
// resolver facilitates resolving Aggregators an instrument needs to aggregate
|
|
// measurements with while updating all pipelines that need to pull from those
|
|
// aggregations.
|
|
type resolver[N int64 | float64] struct {
|
|
inserters []*inserter[N]
|
|
}
|
|
|
|
func newResolver[N int64 | float64](p pipelines) *resolver[N] {
|
|
in := make([]*inserter[N], len(p))
|
|
for i := range in {
|
|
in[i] = newInserter[N](p[i])
|
|
}
|
|
return &resolver[N]{in}
|
|
}
|
|
|
|
// Aggregators returns the Aggregators instrument inst needs to update when it
|
|
// makes a measurement.
|
|
func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
|
|
var aggs []internal.Aggregator[N]
|
|
|
|
errs := &multierror{}
|
|
for _, i := range r.inserters {
|
|
a, err := i.Instrument(inst, instUnit)
|
|
if err != nil {
|
|
errs.append(err)
|
|
}
|
|
aggs = append(aggs, a...)
|
|
}
|
|
return aggs, errs.errorOrNil()
|
|
}
|
|
|
|
type multierror struct {
|
|
wrapped error
|
|
errors []string
|
|
}
|
|
|
|
func (m *multierror) errorOrNil() error {
|
|
if len(m.errors) == 0 {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
|
|
}
|
|
|
|
func (m *multierror) append(err error) {
|
|
m.errors = append(m.errors, err.Error())
|
|
}
|