You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-08-24 19:39:14 +02:00
```go type interInst struct { x int } type inter interface { } var sink []inter func BenchmarkX(b *testing.B) { sink = make([]inter, b.N) for i := 0; i < b.N; i++ { sink[i] = &interInst{} } clear(sink) sink = sink[:0] runtime.GC() var ms runtime.MemStats runtime.ReadMemStats(&ms) b.Log(b.N, ms.Frees) // Frees is the cumulative count of heap objects freed. } ``` ``` clear: ioz_test.go:35: 1 589 ioz_test.go:35: 100 711 ioz_test.go:35: 10000 10729 ioz_test.go:35: 1000000 1010750 <-- 1m+ freed ioz_test.go:35: 16076874 17087643 ioz_test.go:35: 19514749 36602412 ``` ``` no clear: ioz_test.go:35: 1 585 ioz_test.go:35: 100 606 ioz_test.go:35: 10000 725 ioz_test.go:35: 1000000 10745 <-- some "overhead" objects freed, not the slice. ioz_test.go:35: 16391445 1010765 ioz_test.go:35: 21765238 17402230 ``` This is documented at https://go.dev/wiki/SliceTricks: > NOTE If the type of the element is a pointer or a struct with pointer fields, which need to be garbage collected, the above implementations of Cut and Delete have a potential memory leak problem: some elements with values are still referenced by slice a’s underlying array, just not “visible” in the slice. Because the “deleted” value is referenced in the underlying array, the deleted value is still “reachable” during GC, even though the value cannot be referenced by your code. If the underlying array is long-lived, this represents a leak. Followed by examples of how zeroing out the slice elements solves the problem. This PR does the same.
660 lines
22 KiB
Go
660 lines
22 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"go.opentelemetry.io/otel/internal/global"
|
|
"go.opentelemetry.io/otel/metric/embedded"
|
|
"go.opentelemetry.io/otel/sdk/instrumentation"
|
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
|
"go.opentelemetry.io/otel/sdk/metric/internal"
|
|
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
|
"go.opentelemetry.io/otel/sdk/metric/internal/x"
|
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
)
|
|
|
|
var (
|
|
errCreatingAggregators = errors.New("could not create all aggregators")
|
|
errIncompatibleAggregation = errors.New("incompatible aggregation")
|
|
errUnknownAggregation = errors.New("unrecognized aggregation")
|
|
)
|
|
|
|
// instrumentSync is a synchronization point between a pipeline and an
|
|
// instrument's aggregate function.
|
|
type instrumentSync struct {
|
|
name string
|
|
description string
|
|
unit string
|
|
compAgg aggregate.ComputeAggregation
|
|
}
|
|
|
|
func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
|
|
if res == nil {
|
|
res = resource.Empty()
|
|
}
|
|
return &pipeline{
|
|
resource: res,
|
|
reader: reader,
|
|
views: views,
|
|
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
|
|
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
|
|
exemplarFilter: exemplarFilter,
|
|
// aggregations is lazy allocated when needed.
|
|
}
|
|
}
|
|
|
|
// pipeline connects all of the instruments created by a meter provider to a Reader.
|
|
// This is the object that will be `Reader.register()` when a meter provider is created.
|
|
//
|
|
// As instruments are created the instrument should be checked if it exists in
|
|
// the views of a the Reader, and if so each aggregate function should be added
|
|
// to the pipeline.
|
|
type pipeline struct {
|
|
resource *resource.Resource
|
|
|
|
reader Reader
|
|
views []View
|
|
|
|
sync.Mutex
|
|
int64Measures map[observableID[int64]][]aggregate.Measure[int64]
|
|
float64Measures map[observableID[float64]][]aggregate.Measure[float64]
|
|
aggregations map[instrumentation.Scope][]instrumentSync
|
|
callbacks []func(context.Context) error
|
|
multiCallbacks list.List
|
|
exemplarFilter exemplar.Filter
|
|
}
|
|
|
|
// addInt64Measure adds a new int64 measure to the pipeline for each observer.
|
|
func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
p.int64Measures[id] = m
|
|
}
|
|
|
|
// addFloat64Measure adds a new float64 measure to the pipeline for each observer.
|
|
func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
p.float64Measures[id] = m
|
|
}
|
|
|
|
// addSync adds the instrumentSync to pipeline p with scope. This method is not
|
|
// idempotent. Duplicate calls will result in duplicate additions, it is the
|
|
// callers responsibility to ensure this is called with unique values.
|
|
func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
if p.aggregations == nil {
|
|
p.aggregations = map[instrumentation.Scope][]instrumentSync{
|
|
scope: {iSync},
|
|
}
|
|
return
|
|
}
|
|
p.aggregations[scope] = append(p.aggregations[scope], iSync)
|
|
}
|
|
|
|
type multiCallback func(context.Context) error
|
|
|
|
// addMultiCallback registers a multi-instrument callback to be run when
|
|
// `produce()` is called.
|
|
func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
e := p.multiCallbacks.PushBack(c)
|
|
return func() {
|
|
p.Lock()
|
|
p.multiCallbacks.Remove(e)
|
|
p.Unlock()
|
|
}
|
|
}
|
|
|
|
// produce returns aggregated metrics from a single collection.
|
|
//
|
|
// This method is safe to call concurrently.
|
|
func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
|
|
var err error
|
|
for _, c := range p.callbacks {
|
|
// TODO make the callbacks parallel. ( #3034 )
|
|
if e := c(ctx); e != nil {
|
|
err = errors.Join(err, e)
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
rm.Resource = nil
|
|
clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
|
|
rm.ScopeMetrics = rm.ScopeMetrics[:0]
|
|
return err
|
|
}
|
|
}
|
|
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
|
|
// TODO make the callbacks parallel. ( #3034 )
|
|
f := e.Value.(multiCallback)
|
|
if e := f(ctx); e != nil {
|
|
err = errors.Join(err, e)
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
// This means the context expired before we finished running callbacks.
|
|
rm.Resource = nil
|
|
clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
|
|
rm.ScopeMetrics = rm.ScopeMetrics[:0]
|
|
return err
|
|
}
|
|
}
|
|
|
|
rm.Resource = p.resource
|
|
rm.ScopeMetrics = internal.ReuseSlice(rm.ScopeMetrics, len(p.aggregations))
|
|
|
|
i := 0
|
|
for scope, instruments := range p.aggregations {
|
|
rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments))
|
|
j := 0
|
|
for _, inst := range instruments {
|
|
data := rm.ScopeMetrics[i].Metrics[j].Data
|
|
if n := inst.compAgg(&data); n > 0 {
|
|
rm.ScopeMetrics[i].Metrics[j].Name = inst.name
|
|
rm.ScopeMetrics[i].Metrics[j].Description = inst.description
|
|
rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit
|
|
rm.ScopeMetrics[i].Metrics[j].Data = data
|
|
j++
|
|
}
|
|
}
|
|
rm.ScopeMetrics[i].Metrics = rm.ScopeMetrics[i].Metrics[:j]
|
|
if len(rm.ScopeMetrics[i].Metrics) > 0 {
|
|
rm.ScopeMetrics[i].Scope = scope
|
|
i++
|
|
}
|
|
}
|
|
|
|
rm.ScopeMetrics = rm.ScopeMetrics[:i]
|
|
|
|
return err
|
|
}
|
|
|
|
// inserter facilitates inserting of new instruments from a single scope into a
|
|
// pipeline.
|
|
type inserter[N int64 | float64] struct {
|
|
// aggregators is a cache that holds aggregate function inputs whose
|
|
// outputs have been inserted into the underlying reader pipeline. This
|
|
// cache ensures no duplicate aggregate functions are inserted into the
|
|
// reader pipeline and if a new request during an instrument creation asks
|
|
// for the same aggregate function input the same instance is returned.
|
|
aggregators *cache[instID, aggVal[N]]
|
|
|
|
// views is a cache that holds instrument identifiers for all the
|
|
// instruments a Meter has created, it is provided from the Meter that owns
|
|
// this inserter. This cache ensures during the creation of instruments
|
|
// with the same name but different options (e.g. description, unit) a
|
|
// warning message is logged.
|
|
views *cache[string, instID]
|
|
|
|
pipeline *pipeline
|
|
}
|
|
|
|
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] {
|
|
if vc == nil {
|
|
vc = &cache[string, instID]{}
|
|
}
|
|
return &inserter[N]{
|
|
aggregators: &cache[instID, aggVal[N]]{},
|
|
views: vc,
|
|
pipeline: p,
|
|
}
|
|
}
|
|
|
|
// Instrument inserts the instrument inst with instUnit into a pipeline. All
|
|
// views the pipeline contains are matched against, and any matching view that
|
|
// creates a unique aggregate function will have its output inserted into the
|
|
// pipeline and its input included in the returned slice.
|
|
//
|
|
// The returned aggregate function inputs are ensured to be deduplicated and
|
|
// unique. If another view in another pipeline that is cached by this
|
|
// inserter's cache has already inserted the same aggregate function for the
|
|
// same instrument, that functions input instance is returned.
|
|
//
|
|
// If another instrument has already been inserted by this inserter, or any
|
|
// other using the same cache, and it conflicts with the instrument being
|
|
// inserted in this call, an aggregate function input matching the arguments
|
|
// will still be returned but an Info level log message will also be logged to
|
|
// the OTel global logger.
|
|
//
|
|
// If the passed instrument would result in an incompatible aggregate function,
|
|
// an error is returned and that aggregate function output is not inserted nor
|
|
// is its input returned.
|
|
//
|
|
// If an instrument is determined to use a Drop aggregation, that instrument is
|
|
// not inserted nor returned.
|
|
func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) ([]aggregate.Measure[N], error) {
|
|
var (
|
|
matched bool
|
|
measures []aggregate.Measure[N]
|
|
)
|
|
|
|
var err error
|
|
seen := make(map[uint64]struct{})
|
|
for _, v := range i.pipeline.views {
|
|
stream, match := v(inst)
|
|
if !match {
|
|
continue
|
|
}
|
|
matched = true
|
|
in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
|
|
if e != nil {
|
|
err = errors.Join(err, e)
|
|
}
|
|
if in == nil { // Drop aggregation.
|
|
continue
|
|
}
|
|
if _, ok := seen[id]; ok {
|
|
// This aggregate function has already been added.
|
|
continue
|
|
}
|
|
seen[id] = struct{}{}
|
|
measures = append(measures, in)
|
|
}
|
|
|
|
if err != nil {
|
|
err = errors.Join(errCreatingAggregators, err)
|
|
}
|
|
|
|
if matched {
|
|
return measures, err
|
|
}
|
|
|
|
// Apply implicit default view if no explicit matched.
|
|
stream := Stream{
|
|
Name: inst.Name,
|
|
Description: inst.Description,
|
|
Unit: inst.Unit,
|
|
}
|
|
in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
|
|
if e != nil {
|
|
if err == nil {
|
|
err = errCreatingAggregators
|
|
}
|
|
err = errors.Join(err, e)
|
|
}
|
|
if in != nil {
|
|
// Ensured to have not seen given matched was false.
|
|
measures = append(measures, in)
|
|
}
|
|
return measures, err
|
|
}
|
|
|
|
// addCallback registers a single instrument callback to be run when
|
|
// `produce()` is called.
|
|
func (i *inserter[N]) addCallback(cback func(context.Context) error) {
|
|
i.pipeline.Lock()
|
|
defer i.pipeline.Unlock()
|
|
i.pipeline.callbacks = append(i.pipeline.callbacks, cback)
|
|
}
|
|
|
|
var aggIDCount uint64
|
|
|
|
// aggVal is the cached value in an aggregators cache.
|
|
type aggVal[N int64 | float64] struct {
|
|
ID uint64
|
|
Measure aggregate.Measure[N]
|
|
Err error
|
|
}
|
|
|
|
// readerDefaultAggregation returns the default aggregation for the instrument
|
|
// kind based on the reader's aggregation preferences. This is used unless the
|
|
// aggregation is overridden with a view.
|
|
func (i *inserter[N]) readerDefaultAggregation(kind InstrumentKind) Aggregation {
|
|
aggregation := i.pipeline.reader.aggregation(kind)
|
|
switch aggregation.(type) {
|
|
case nil, AggregationDefault:
|
|
// If the reader returns default or nil use the default selector.
|
|
aggregation = DefaultAggregationSelector(kind)
|
|
default:
|
|
// Deep copy and validate before using.
|
|
aggregation = aggregation.copy()
|
|
if err := aggregation.err(); err != nil {
|
|
orig := aggregation
|
|
aggregation = DefaultAggregationSelector(kind)
|
|
global.Error(
|
|
err, "using default aggregation instead",
|
|
"aggregation", orig,
|
|
"replacement", aggregation,
|
|
)
|
|
}
|
|
}
|
|
return aggregation
|
|
}
|
|
|
|
// cachedAggregator returns the appropriate aggregate input and output
|
|
// functions for an instrument configuration. If the exact instrument has been
|
|
// created within the inst.Scope, those aggregate function instances will be
|
|
// returned. Otherwise, new computed aggregate functions will be cached and
|
|
// returned.
|
|
//
|
|
// If the instrument configuration conflicts with an instrument that has
|
|
// already been created (e.g. description, unit, data type) a warning will be
|
|
// logged at the "Info" level with the global OTel logger. Valid new aggregate
|
|
// functions for the instrument configuration will still be returned without an
|
|
// error.
|
|
//
|
|
// If the instrument defines an unknown or incompatible aggregation, an error
|
|
// is returned.
|
|
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation) (meas aggregate.Measure[N], aggID uint64, err error) {
|
|
switch stream.Aggregation.(type) {
|
|
case nil:
|
|
// The aggregation was not overridden with a view. Use the aggregation
|
|
// provided by the reader.
|
|
stream.Aggregation = readerAggregation
|
|
case AggregationDefault:
|
|
// The view explicitly requested the default aggregation.
|
|
stream.Aggregation = DefaultAggregationSelector(kind)
|
|
}
|
|
if stream.ExemplarReservoirProviderSelector == nil {
|
|
stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector
|
|
}
|
|
|
|
if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
|
|
return nil, 0, fmt.Errorf(
|
|
"creating aggregator with instrumentKind: %d, aggregation %v: %w",
|
|
kind, stream.Aggregation, err,
|
|
)
|
|
}
|
|
|
|
id := i.instID(kind, stream)
|
|
// If there is a conflict, the specification says the view should
|
|
// still be applied and a warning should be logged.
|
|
i.logConflict(id)
|
|
|
|
// If there are requests for the same instrument with different name
|
|
// casing, the first-seen needs to be returned. Use a normalize ID for the
|
|
// cache lookup to ensure the correct comparison.
|
|
normID := id.normalize()
|
|
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
|
|
b := aggregate.Builder[N]{
|
|
Temporality: i.pipeline.reader.temporality(kind),
|
|
ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter),
|
|
}
|
|
b.Filter = stream.AttributeFilter
|
|
// A value less than or equal to zero will disable the aggregation
|
|
// limits for the builder (an all the created aggregates).
|
|
// CardinalityLimit.Lookup returns 0 by default if unset (or
|
|
// unrecognized input). Use that value directly.
|
|
b.AggregationLimit, _ = x.CardinalityLimit.Lookup()
|
|
|
|
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
|
|
if err != nil {
|
|
return aggVal[N]{0, nil, err}
|
|
}
|
|
if in == nil { // Drop aggregator.
|
|
return aggVal[N]{0, nil, nil}
|
|
}
|
|
i.pipeline.addSync(scope, instrumentSync{
|
|
// Use the first-seen name casing for this and all subsequent
|
|
// requests of this instrument.
|
|
name: stream.Name,
|
|
description: stream.Description,
|
|
unit: stream.Unit,
|
|
compAgg: out,
|
|
})
|
|
id := atomic.AddUint64(&aggIDCount, 1)
|
|
return aggVal[N]{id, in, err}
|
|
})
|
|
return cv.Measure, cv.ID, cv.Err
|
|
}
|
|
|
|
// logConflict validates if an instrument with the same case-insensitive name
|
|
// as id has already been created. If that instrument conflicts with id, a
|
|
// warning is logged.
|
|
func (i *inserter[N]) logConflict(id instID) {
|
|
// The API specification defines names as case-insensitive. If there is a
|
|
// different casing of a name it needs to be a conflict.
|
|
name := id.normalize().Name
|
|
existing := i.views.Lookup(name, func() instID { return id })
|
|
if id == existing {
|
|
return
|
|
}
|
|
|
|
const msg = "duplicate metric stream definitions"
|
|
args := []interface{}{
|
|
"names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
|
|
"descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
|
|
"kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind),
|
|
"units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit),
|
|
"numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number),
|
|
}
|
|
|
|
// The specification recommends logging a suggested view to resolve
|
|
// conflicts if possible.
|
|
//
|
|
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#duplicate-instrument-registration
|
|
if id.Unit != existing.Unit || id.Number != existing.Number {
|
|
// There is no view resolution for these, don't make a suggestion.
|
|
global.Warn(msg, args...)
|
|
return
|
|
}
|
|
|
|
var stream string
|
|
if id.Name != existing.Name || id.Kind != existing.Kind {
|
|
stream = `Stream{Name: "{{NEW_NAME}}"}`
|
|
} else if id.Description != existing.Description {
|
|
stream = fmt.Sprintf("Stream{Description: %q}", existing.Description)
|
|
}
|
|
|
|
inst := fmt.Sprintf(
|
|
"Instrument{Name: %q, Description: %q, Kind: %q, Unit: %q}",
|
|
id.Name, id.Description, "InstrumentKind"+id.Kind.String(), id.Unit,
|
|
)
|
|
args = append(args, "suggested.view", fmt.Sprintf("NewView(%s, %s)", inst, stream))
|
|
|
|
global.Warn(msg, args...)
|
|
}
|
|
|
|
func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
|
|
var zero N
|
|
return instID{
|
|
Name: stream.Name,
|
|
Description: stream.Description,
|
|
Unit: stream.Unit,
|
|
Kind: kind,
|
|
Number: fmt.Sprintf("%T", zero),
|
|
}
|
|
}
|
|
|
|
// aggregateFunc returns new aggregate functions matching agg, kind, and
|
|
// monotonic. If the agg is unknown or temporality is invalid, an error is
|
|
// returned.
|
|
func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg Aggregation, kind InstrumentKind) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) {
|
|
switch a := agg.(type) {
|
|
case AggregationDefault:
|
|
return i.aggregateFunc(b, DefaultAggregationSelector(kind), kind)
|
|
case AggregationDrop:
|
|
// Return nil in and out to signify the drop aggregator.
|
|
case AggregationLastValue:
|
|
switch kind {
|
|
case InstrumentKindGauge:
|
|
meas, comp = b.LastValue()
|
|
case InstrumentKindObservableGauge:
|
|
meas, comp = b.PrecomputedLastValue()
|
|
}
|
|
case AggregationSum:
|
|
switch kind {
|
|
case InstrumentKindObservableCounter:
|
|
meas, comp = b.PrecomputedSum(true)
|
|
case InstrumentKindObservableUpDownCounter:
|
|
meas, comp = b.PrecomputedSum(false)
|
|
case InstrumentKindCounter, InstrumentKindHistogram:
|
|
meas, comp = b.Sum(true)
|
|
default:
|
|
// InstrumentKindUpDownCounter, InstrumentKindObservableGauge, and
|
|
// instrumentKindUndefined or other invalid instrument kinds.
|
|
meas, comp = b.Sum(false)
|
|
}
|
|
case AggregationExplicitBucketHistogram:
|
|
var noSum bool
|
|
switch kind {
|
|
case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge, InstrumentKindGauge:
|
|
// The sum should not be collected for any instrument that can make
|
|
// negative measurements:
|
|
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
|
|
noSum = true
|
|
}
|
|
meas, comp = b.ExplicitBucketHistogram(a.Boundaries, a.NoMinMax, noSum)
|
|
case AggregationBase2ExponentialHistogram:
|
|
var noSum bool
|
|
switch kind {
|
|
case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge, InstrumentKindGauge:
|
|
// The sum should not be collected for any instrument that can make
|
|
// negative measurements:
|
|
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
|
|
noSum = true
|
|
}
|
|
meas, comp = b.ExponentialBucketHistogram(a.MaxSize, a.MaxScale, a.NoMinMax, noSum)
|
|
|
|
default:
|
|
err = errUnknownAggregation
|
|
}
|
|
|
|
return meas, comp, err
|
|
}
|
|
|
|
// isAggregatorCompatible checks if the aggregation can be used by the instrument.
|
|
// Current compatibility:
|
|
//
|
|
// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
|
|
// |--------------------------|------|-----------|-----|-----------|-----------------------|
|
|
// | Counter | ✓ | | ✓ | ✓ | ✓ |
|
|
// | UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
|
|
// | Histogram | ✓ | | ✓ | ✓ | ✓ |
|
|
// | Gauge | ✓ | ✓ | | ✓ | ✓ |
|
|
// | Observable Counter | ✓ | | ✓ | ✓ | ✓ |
|
|
// | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
|
|
// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |.
|
|
func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
|
|
switch agg.(type) {
|
|
case AggregationDefault:
|
|
return nil
|
|
case AggregationExplicitBucketHistogram, AggregationBase2ExponentialHistogram:
|
|
switch kind {
|
|
case InstrumentKindCounter,
|
|
InstrumentKindUpDownCounter,
|
|
InstrumentKindHistogram,
|
|
InstrumentKindGauge,
|
|
InstrumentKindObservableCounter,
|
|
InstrumentKindObservableUpDownCounter,
|
|
InstrumentKindObservableGauge:
|
|
return nil
|
|
default:
|
|
return errIncompatibleAggregation
|
|
}
|
|
case AggregationSum:
|
|
switch kind {
|
|
case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter:
|
|
return nil
|
|
default:
|
|
// TODO: review need for aggregation check after
|
|
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
|
return errIncompatibleAggregation
|
|
}
|
|
case AggregationLastValue:
|
|
switch kind {
|
|
case InstrumentKindObservableGauge, InstrumentKindGauge:
|
|
return nil
|
|
}
|
|
// TODO: review need for aggregation check after
|
|
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
|
return errIncompatibleAggregation
|
|
case AggregationDrop:
|
|
return nil
|
|
default:
|
|
// This is used passed checking for default, it should be an error at this point.
|
|
return fmt.Errorf("%w: %v", errUnknownAggregation, agg)
|
|
}
|
|
}
|
|
|
|
// pipelines is the group of pipelines connecting Readers with instrument
|
|
// measurement.
|
|
type pipelines []*pipeline
|
|
|
|
func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
|
|
pipes := make([]*pipeline, 0, len(readers))
|
|
for _, r := range readers {
|
|
p := newPipeline(res, r, views, exemplarFilter)
|
|
r.register(p)
|
|
pipes = append(pipes, p)
|
|
}
|
|
return pipes
|
|
}
|
|
|
|
type unregisterFuncs struct {
|
|
embedded.Registration
|
|
f []func()
|
|
}
|
|
|
|
func (u unregisterFuncs) Unregister() error {
|
|
for _, f := range u.f {
|
|
f()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// resolver facilitates resolving aggregate functions an instrument calls to
|
|
// aggregate measurements with while updating all pipelines that need to pull
|
|
// from those aggregations.
|
|
type resolver[N int64 | float64] struct {
|
|
inserters []*inserter[N]
|
|
}
|
|
|
|
func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] {
|
|
in := make([]*inserter[N], len(p))
|
|
for i := range in {
|
|
in[i] = newInserter[N](p[i], vc)
|
|
}
|
|
return resolver[N]{in}
|
|
}
|
|
|
|
// Aggregators returns the Aggregators that must be updated by the instrument
|
|
// defined by key.
|
|
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
|
|
var measures []aggregate.Measure[N]
|
|
|
|
var err error
|
|
for _, i := range r.inserters {
|
|
in, e := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
|
|
if e != nil {
|
|
err = errors.Join(err, e)
|
|
}
|
|
measures = append(measures, in...)
|
|
}
|
|
return measures, err
|
|
}
|
|
|
|
// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
|
|
// defined by key. If boundaries were provided on instrument instantiation, those take precedence
|
|
// over boundaries provided by the reader.
|
|
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
|
|
var measures []aggregate.Measure[N]
|
|
|
|
var err error
|
|
for _, i := range r.inserters {
|
|
agg := i.readerDefaultAggregation(id.Kind)
|
|
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
|
|
histAgg.Boundaries = boundaries
|
|
agg = histAgg
|
|
}
|
|
in, e := i.Instrument(id, agg)
|
|
if e != nil {
|
|
err = errors.Join(err, e)
|
|
}
|
|
measures = append(measures, in...)
|
|
}
|
|
return measures, err
|
|
}
|