You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-25 22:41:46 +02:00
Handle duplicate Aggregators and log instrument conflicts (#3251)
* Add the cache type * Add cache unit tests * Test cache concurrency * Add the instrumentCache * Use the instrumentCache to deduplicate creation * Drop unique check from addAggregator * Fix aggregatorCache* docs * Update cachedAggregator and aggregator method docs * Remove unnecessary type constraint * Remove unused errAlreadyRegistered * Rename to not shadow imports * Add changes to changelog * Fix changelog English * Store resolvers in the meter instead of caches * Test all Aggregator[N] impls are comparable * Fix lint * Add documentation that Aggregators need to be comparable * Update sdk/metric/internal/aggregator.go Co-authored-by: Anthony Mirabella <a9@aneurysm9.com> * Update sdk/metric/instrument.go Co-authored-by: Anthony Mirabella <a9@aneurysm9.com> * Update sdk/metric/instrument.go Co-authored-by: Anthony Mirabella <a9@aneurysm9.com> * Update sdk/metric/internal/aggregator_test.go Co-authored-by: Anthony Mirabella <a9@aneurysm9.com> * Fix pipeline_test.go use of newInstrumentCache Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
This commit is contained in:
@@ -24,6 +24,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
### Fixed
|
||||
|
||||
- Use default view if instrument does not match any registered view of a reader. (#3224, #3237)
|
||||
- Return the same instrument every time a user makes the exact same instrument creation call. (#3229, #3251)
|
||||
- Return the existing instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251)
|
||||
- Log a warning when a conflicting instrument (e.g. description, unit, data-type) is created instead of returning an error. (#3251)
|
||||
- The OpenCensus bridge no longer sends empty batches of metrics. (#3263)
|
||||
|
||||
## [0.32.1] Metric SDK (Alpha) - 2022-09-22
|
||||
|
||||
110
sdk/metric/cache.go
Normal file
110
sdk/metric/cache.go
Normal file
@@ -0,0 +1,110 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal"
|
||||
)
|
||||
|
||||
// cache is a locking storage used to quickly return already computed values.
|
||||
//
|
||||
// The zero value of a cache is empty and ready to use.
|
||||
//
|
||||
// A cache must not be copied after first use.
|
||||
//
|
||||
// All methods of a cache are safe to call concurrently.
|
||||
type cache[K comparable, V any] struct {
|
||||
sync.Mutex
|
||||
data map[K]V
|
||||
}
|
||||
|
||||
// Lookup returns the value stored in the cache with the accociated key if it
|
||||
// exists. Otherwise, f is called and its returned value is set in the cache
|
||||
// for key and returned.
|
||||
//
|
||||
// Lookup is safe to call concurrently. It will hold the cache lock, so f
|
||||
// should not block excessively.
|
||||
func (c *cache[K, V]) Lookup(key K, f func() V) V {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
if c.data == nil {
|
||||
val := f()
|
||||
c.data = map[K]V{key: val}
|
||||
return val
|
||||
}
|
||||
if v, ok := c.data[key]; ok {
|
||||
return v
|
||||
}
|
||||
val := f()
|
||||
c.data[key] = val
|
||||
return val
|
||||
}
|
||||
|
||||
// instrumentCache is a cache of instruments. It is scoped at the Meter level
|
||||
// along with a number type. Meaning all instruments it contains need to belong
|
||||
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
|
||||
type instrumentCache[N int64 | float64] struct {
|
||||
// aggregators is used to ensure duplicate creations of the same instrument
|
||||
// return the same instance of that instrument's aggregator.
|
||||
aggregators *cache[instrumentID, aggVal[N]]
|
||||
// views is used to ensure if instruments with the same name are created,
|
||||
// but do not have the same identifying properties, a warning is logged.
|
||||
views *cache[string, instrumentID]
|
||||
}
|
||||
|
||||
// newInstrumentCache returns a new instrumentCache that uses ac as the
|
||||
// underlying cache for aggregators and vc as the cache for views. If ac or vc
|
||||
// are nil, a new empty cache will be used.
|
||||
func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] {
|
||||
if ac == nil {
|
||||
ac = &cache[instrumentID, aggVal[N]]{}
|
||||
}
|
||||
if vc == nil {
|
||||
vc = &cache[string, instrumentID]{}
|
||||
}
|
||||
return instrumentCache[N]{aggregators: ac, views: vc}
|
||||
}
|
||||
|
||||
// LookupAggregator returns the Aggregator and error for a cached instrument if
|
||||
// it exist in the cache. Otherwise, f is called and its returned value is set
|
||||
// in the cache and returned.
|
||||
//
|
||||
// LookupAggregator is safe to call concurrently.
|
||||
func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
|
||||
v := c.aggregators.Lookup(id, func() aggVal[N] {
|
||||
a, err := f()
|
||||
return aggVal[N]{Aggregator: a, Err: err}
|
||||
})
|
||||
return v.Aggregator, v.Err
|
||||
}
|
||||
|
||||
// aggVal is the cached value of an instrumentCache's aggregators cache.
|
||||
type aggVal[N int64 | float64] struct {
|
||||
Aggregator internal.Aggregator[N]
|
||||
Err error
|
||||
}
|
||||
|
||||
// Unique returns if id is unique or a duplicate instrument. If an instrument
|
||||
// with the same name has already been created, that instrumentID will be
|
||||
// returned along with false. Otherwise, id is returned with true.
|
||||
//
|
||||
// Unique is safe to call concurrently.
|
||||
func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) {
|
||||
got := c.views.Lookup(id.Name, func() instrumentID { return id })
|
||||
return got, id == got
|
||||
}
|
||||
76
sdk/metric/cache_test.go
Normal file
76
sdk/metric/cache_test.go
Normal file
@@ -0,0 +1,76 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
k0, k1 := "one", "two"
|
||||
v0, v1 := 1, 2
|
||||
|
||||
c := cache[string, int]{}
|
||||
|
||||
var got int
|
||||
require.NotPanics(t, func() {
|
||||
got = c.Lookup(k0, func() int { return v0 })
|
||||
}, "zero-value cache panics on Lookup")
|
||||
assert.Equal(t, v0, got, "zero-value cache did not return fallback")
|
||||
|
||||
assert.Equal(t, v0, c.Lookup(k0, func() int { return v1 }), "existing key")
|
||||
|
||||
assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key")
|
||||
}
|
||||
|
||||
func TestCacheConcurrency(t *testing.T) {
|
||||
const (
|
||||
key = "k"
|
||||
goroutines = 10
|
||||
timeoutSec = 5
|
||||
)
|
||||
|
||||
c := cache[string, int]{}
|
||||
var wg sync.WaitGroup
|
||||
for n := 0; n < goroutines; n++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
assert.NotPanics(t, func() {
|
||||
c.Lookup(key, func() int { return i })
|
||||
})
|
||||
}(n)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-done:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, timeoutSec*time.Second, 10*time.Millisecond)
|
||||
}
|
||||
@@ -23,9 +23,32 @@ import (
|
||||
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
// instrumentID are the identifying properties of an instrument.
|
||||
type instrumentID struct {
|
||||
// Name is the name of the instrument.
|
||||
Name string
|
||||
// Description is the description of the instrument.
|
||||
Description string
|
||||
// Unit is the unit of the instrument.
|
||||
Unit unit.Unit
|
||||
// Aggregation is the aggregation data type of the instrument.
|
||||
Aggregation string
|
||||
// Monotonic is the monotonicity of an instruments data type. This field is
|
||||
// not used for all data types, so a zero value needs to be understood in the
|
||||
// context of Aggregation.
|
||||
Monotonic bool
|
||||
// Temporality is the temporality of an instrument's data type. This field
|
||||
// is not used by some data types.
|
||||
Temporality metricdata.Temporality
|
||||
// Number is the number type of the instrument.
|
||||
Number string
|
||||
}
|
||||
|
||||
type instrumentImpl[N int64 | float64] struct {
|
||||
instrument.Asynchronous
|
||||
instrument.Synchronous
|
||||
|
||||
@@ -26,6 +26,9 @@ import (
|
||||
var now = time.Now
|
||||
|
||||
// Aggregator forms an aggregation from a collection of recorded measurements.
|
||||
//
|
||||
// Aggregators need to be comparable so they can be de-duplicated by the SDK when
|
||||
// it creates them for multiple views.
|
||||
type Aggregator[N int64 | float64] interface {
|
||||
// Aggregate records the measurement, scoped by attr, and aggregates it
|
||||
// into an aggregation.
|
||||
|
||||
@@ -20,6 +20,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||
@@ -80,23 +82,31 @@ type aggregatorTester[N int64 | float64] struct {
|
||||
func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) {
|
||||
m := at.MeasurementN * at.GoroutineN
|
||||
return func(t *testing.T) {
|
||||
for i := 0; i < at.CycleN; i++ {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(at.GoroutineN)
|
||||
for i := 0; i < at.GoroutineN; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < at.MeasurementN; j++ {
|
||||
for attrs, n := range incr {
|
||||
a.Aggregate(N(n), attrs)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
t.Run("Comparable", func(t *testing.T) {
|
||||
assert.NotPanics(t, func() {
|
||||
_ = map[Aggregator[N]]struct{}{a: {}}
|
||||
})
|
||||
})
|
||||
|
||||
metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
|
||||
}
|
||||
t.Run("Correctness", func(t *testing.T) {
|
||||
for i := 0; i < at.CycleN; i++ {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(at.GoroutineN)
|
||||
for j := 0; j < at.GoroutineN; j++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for k := 0; k < at.MeasurementN; k++ {
|
||||
for attrs, n := range incr {
|
||||
a.Aggregate(N(n), attrs)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -55,10 +55,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
|
||||
defer r.Unlock()
|
||||
|
||||
if r.meters == nil {
|
||||
m := &meter{
|
||||
Scope: s,
|
||||
pipes: r.pipes,
|
||||
}
|
||||
m := newMeter(s, r.pipes)
|
||||
r.meters = map[instrumentation.Scope]*meter{s: m}
|
||||
return m
|
||||
}
|
||||
@@ -68,10 +65,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
|
||||
return m
|
||||
}
|
||||
|
||||
m = &meter{
|
||||
Scope: s,
|
||||
pipes: r.pipes,
|
||||
}
|
||||
m = newMeter(s, r.pipes)
|
||||
r.meters[s] = m
|
||||
return m
|
||||
}
|
||||
@@ -83,20 +77,45 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
|
||||
type meter struct {
|
||||
instrumentation.Scope
|
||||
|
||||
// *Resolvers are used by the provided instrument providers to resolve new
|
||||
// instruments aggregators and maintain a cache across instruments this
|
||||
// meter owns.
|
||||
int64Resolver resolver[int64]
|
||||
float64Resolver resolver[float64]
|
||||
|
||||
pipes pipelines
|
||||
}
|
||||
|
||||
func newMeter(s instrumentation.Scope, p pipelines) *meter {
|
||||
// viewCache ensures instrument conflicts, including number conflicts, this
|
||||
// meter is asked to create are logged to the user.
|
||||
var viewCache cache[string, instrumentID]
|
||||
|
||||
// Passing nil as the ac parameter to newInstrumentCache will have each
|
||||
// create its own aggregator cache.
|
||||
ic := newInstrumentCache[int64](nil, &viewCache)
|
||||
fc := newInstrumentCache[float64](nil, &viewCache)
|
||||
|
||||
return &meter{
|
||||
Scope: s,
|
||||
pipes: p,
|
||||
|
||||
int64Resolver: newResolver(p, ic),
|
||||
float64Resolver: newResolver(p, fc),
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time check meter implements metric.Meter.
|
||||
var _ metric.Meter = (*meter)(nil)
|
||||
|
||||
// AsyncInt64 returns the asynchronous integer instrument provider.
|
||||
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
|
||||
return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
|
||||
return asyncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver}
|
||||
}
|
||||
|
||||
// AsyncFloat64 returns the asynchronous floating-point instrument provider.
|
||||
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
|
||||
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
|
||||
return asyncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver}
|
||||
}
|
||||
|
||||
// RegisterCallback registers the function f to be called when any of the
|
||||
@@ -108,10 +127,10 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context
|
||||
|
||||
// SyncInt64 returns the synchronous integer instrument provider.
|
||||
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
|
||||
return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
|
||||
return syncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver}
|
||||
}
|
||||
|
||||
// SyncFloat64 returns the synchronous floating-point instrument provider.
|
||||
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
|
||||
return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
|
||||
return syncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregation"
|
||||
@@ -34,28 +35,19 @@ var (
|
||||
errCreatingAggregators = errors.New("could not create all aggregators")
|
||||
errIncompatibleAggregation = errors.New("incompatible aggregation")
|
||||
errUnknownAggregation = errors.New("unrecognized aggregation")
|
||||
errUnknownTemporality = errors.New("unrecognized temporality")
|
||||
)
|
||||
|
||||
type aggregator interface {
|
||||
Aggregation() metricdata.Aggregation
|
||||
}
|
||||
|
||||
// instrumentID is used to identify multiple instruments being mapped to the
|
||||
// same aggregator. e.g. using an exact match view with a name=* view. You
|
||||
// can't use a view.Instrument here because not all Aggregators are comparable.
|
||||
type instrumentID struct {
|
||||
scope instrumentation.Scope
|
||||
// instrumentSync is a synchronization point between a pipeline and an
|
||||
// instrument's Aggregators.
|
||||
type instrumentSync struct {
|
||||
name string
|
||||
description string
|
||||
}
|
||||
|
||||
type instrumentKey struct {
|
||||
name string
|
||||
unit unit.Unit
|
||||
}
|
||||
|
||||
type instrumentValue struct {
|
||||
description string
|
||||
unit unit.Unit
|
||||
aggregator aggregator
|
||||
}
|
||||
|
||||
@@ -67,7 +59,7 @@ func newPipeline(res *resource.Resource, reader Reader, views []view.View) *pipe
|
||||
resource: res,
|
||||
reader: reader,
|
||||
views: views,
|
||||
aggregations: make(map[instrumentation.Scope]map[instrumentKey]instrumentValue),
|
||||
aggregations: make(map[instrumentation.Scope][]instrumentSync),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,36 +75,23 @@ type pipeline struct {
|
||||
views []view.View
|
||||
|
||||
sync.Mutex
|
||||
aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue
|
||||
aggregations map[instrumentation.Scope][]instrumentSync
|
||||
callbacks []func(context.Context)
|
||||
}
|
||||
|
||||
var errAlreadyRegistered = errors.New("instrument already registered")
|
||||
|
||||
// addAggregator will stores an aggregator with an instrument description. The aggregator
|
||||
// is used when `produce()` is called.
|
||||
func (p *pipeline) addAggregator(scope instrumentation.Scope, name, description string, instUnit unit.Unit, agg aggregator) error {
|
||||
// addSync adds the instrumentSync to pipeline p with scope. This method is not
|
||||
// idempotent. Duplicate calls will result in duplicate additions, it is the
|
||||
// callers responsibility to ensure this is called with unique values.
|
||||
func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
if p.aggregations == nil {
|
||||
p.aggregations = map[instrumentation.Scope]map[instrumentKey]instrumentValue{}
|
||||
p.aggregations = map[instrumentation.Scope][]instrumentSync{
|
||||
scope: {iSync},
|
||||
}
|
||||
return
|
||||
}
|
||||
if p.aggregations[scope] == nil {
|
||||
p.aggregations[scope] = map[instrumentKey]instrumentValue{}
|
||||
}
|
||||
inst := instrumentKey{
|
||||
name: name,
|
||||
unit: instUnit,
|
||||
}
|
||||
if _, ok := p.aggregations[scope][inst]; ok {
|
||||
return fmt.Errorf("%w: name %s, scope: %s", errAlreadyRegistered, name, scope)
|
||||
}
|
||||
|
||||
p.aggregations[scope][inst] = instrumentValue{
|
||||
description: description,
|
||||
aggregator: agg,
|
||||
}
|
||||
return nil
|
||||
p.aggregations[scope] = append(p.aggregations[scope], iSync)
|
||||
}
|
||||
|
||||
// addCallback registers a callback to be run when `produce()` is called.
|
||||
@@ -151,12 +130,12 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
|
||||
sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
|
||||
for scope, instruments := range p.aggregations {
|
||||
metrics := make([]metricdata.Metrics, 0, len(instruments))
|
||||
for inst, instValue := range instruments {
|
||||
data := instValue.aggregator.Aggregation()
|
||||
for _, inst := range instruments {
|
||||
data := inst.aggregator.Aggregation()
|
||||
if data != nil {
|
||||
metrics = append(metrics, metricdata.Metrics{
|
||||
Name: inst.name,
|
||||
Description: instValue.description,
|
||||
Description: inst.description,
|
||||
Unit: inst.unit,
|
||||
Data: data,
|
||||
})
|
||||
@@ -178,20 +157,45 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
|
||||
|
||||
// inserter facilitates inserting of new instruments into a pipeline.
|
||||
type inserter[N int64 | float64] struct {
|
||||
cache instrumentCache[N]
|
||||
pipeline *pipeline
|
||||
}
|
||||
|
||||
func newInserter[N int64 | float64](p *pipeline) *inserter[N] {
|
||||
return &inserter[N]{p}
|
||||
func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] {
|
||||
return &inserter[N]{cache: c, pipeline: p}
|
||||
}
|
||||
|
||||
// Instrument inserts instrument inst with instUnit returning the Aggregators
|
||||
// that need to be updated with measurments for that instrument.
|
||||
// Instrument inserts the instrument inst with instUnit into a pipeline. All
|
||||
// views the pipeline contains are matched against, and any matching view that
|
||||
// creates a unique Aggregator will be inserted into the pipeline and included
|
||||
// in the returned slice.
|
||||
//
|
||||
// The returned Aggregators are ensured to be deduplicated and unique. If
|
||||
// another view in another pipeline that is cached by this inserter's cache has
|
||||
// already inserted the same Aggregator for the same instrument, that
|
||||
// Aggregator instance is returned.
|
||||
//
|
||||
// If another instrument has already been inserted by this inserter, or any
|
||||
// other using the same cache, and it conflicts with the instrument being
|
||||
// inserted in this call, an Aggregator matching the arguments will still be
|
||||
// returned but an Info level log message will also be logged to the OTel
|
||||
// global logger.
|
||||
//
|
||||
// If the passed instrument would result in an incompatible Aggregator, an
|
||||
// error is returned and that Aggregator is not inserted or returned.
|
||||
//
|
||||
// If an instrument is determined to use a Drop aggregation, that instrument is
|
||||
// not inserted nor returned.
|
||||
func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
|
||||
var matched bool
|
||||
seen := map[instrumentID]struct{}{}
|
||||
var aggs []internal.Aggregator[N]
|
||||
var (
|
||||
matched bool
|
||||
aggs []internal.Aggregator[N]
|
||||
)
|
||||
|
||||
errs := &multierror{wrapped: errCreatingAggregators}
|
||||
// The cache will return the same Aggregator instance. Use this fact to
|
||||
// compare pointer addresses to deduplicate Aggregators.
|
||||
seen := make(map[internal.Aggregator[N]]struct{})
|
||||
for _, v := range i.pipeline.views {
|
||||
inst, match := v.TransformInstrument(inst)
|
||||
if !match {
|
||||
@@ -199,53 +203,51 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in
|
||||
}
|
||||
matched = true
|
||||
|
||||
id := instrumentID{
|
||||
scope: inst.Scope,
|
||||
name: inst.Name,
|
||||
description: inst.Description,
|
||||
}
|
||||
if _, ok := seen[id]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
agg, err := i.aggregator(inst)
|
||||
agg, err := i.cachedAggregator(inst, instUnit)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
continue
|
||||
}
|
||||
if agg == nil { // Drop aggregator.
|
||||
continue
|
||||
}
|
||||
// TODO (#3011): If filtering is done at the instrument level add here.
|
||||
// This is where the aggregator and the view are both in scope.
|
||||
if _, ok := seen[agg]; ok {
|
||||
// This aggregator has already been added.
|
||||
continue
|
||||
}
|
||||
seen[agg] = struct{}{}
|
||||
aggs = append(aggs, agg)
|
||||
seen[id] = struct{}{}
|
||||
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
}
|
||||
|
||||
if !matched { // Apply implicit default view if no explicit matched.
|
||||
a, err := i.aggregator(inst)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
if a != nil {
|
||||
aggs = append(aggs, a)
|
||||
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
}
|
||||
if matched {
|
||||
return aggs, errs.errorOrNil()
|
||||
}
|
||||
|
||||
// Apply implicit default view if no explicit matched.
|
||||
agg, err := i.cachedAggregator(inst, instUnit)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
if agg != nil {
|
||||
// Ensured to have not seen given matched was false.
|
||||
aggs = append(aggs, agg)
|
||||
}
|
||||
return aggs, errs.errorOrNil()
|
||||
}
|
||||
|
||||
// aggregator returns the Aggregator for an instrument configuration. If the
|
||||
// instrument defines an unknown aggregation, an error is returned.
|
||||
func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) {
|
||||
// cachedAggregator returns the appropriate Aggregator for an instrument
|
||||
// configuration. If the exact instrument has been created within the
|
||||
// inst.Scope, that Aggregator instance will be returned. Otherwise, a new
|
||||
// computed Aggregator will be cached and returned.
|
||||
//
|
||||
// If the instrument configuration conflicts with an instrument that has
|
||||
// already been created (e.g. description, unit, data type) a warning will be
|
||||
// logged at the "Info" level with the global OTel logger. A valid new
|
||||
// Aggregator for the instrument configuration will still be returned without
|
||||
// an error.
|
||||
//
|
||||
// If the instrument defines an unknown or incompatible aggregation, an error
|
||||
// is returned.
|
||||
func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (internal.Aggregator[N], error) {
|
||||
switch inst.Aggregation.(type) {
|
||||
case nil, aggregation.Default:
|
||||
// Undefined, nil, means to use the default from the reader.
|
||||
@@ -259,31 +261,94 @@ func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N],
|
||||
)
|
||||
}
|
||||
|
||||
var (
|
||||
temporality = i.pipeline.reader.temporality(inst.Kind)
|
||||
monotonic bool
|
||||
)
|
||||
id := i.instrumentID(inst, u)
|
||||
// If there is a conflict, the specification says the view should
|
||||
// still be applied and a warning should be logged.
|
||||
i.logConflict(id)
|
||||
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
|
||||
agg, err := i.aggregator(inst.Aggregation, id.Temporality, id.Monotonic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if agg == nil { // Drop aggregator.
|
||||
return nil, nil
|
||||
}
|
||||
i.pipeline.addSync(inst.Scope, instrumentSync{
|
||||
name: inst.Name,
|
||||
description: inst.Description,
|
||||
unit: u,
|
||||
aggregator: agg,
|
||||
})
|
||||
return agg, err
|
||||
})
|
||||
}
|
||||
|
||||
switch inst.Kind {
|
||||
case view.AsyncCounter, view.SyncCounter, view.SyncHistogram:
|
||||
monotonic = true
|
||||
// logConflict validates if an instrument with the same name as id has already
|
||||
// been created. If that instrument conflicts with id, a warning is logged.
|
||||
func (i *inserter[N]) logConflict(id instrumentID) {
|
||||
existing, unique := i.cache.Unique(id)
|
||||
if unique {
|
||||
return
|
||||
}
|
||||
|
||||
switch agg := inst.Aggregation.(type) {
|
||||
global.Info(
|
||||
"duplicate metric stream definitions",
|
||||
"names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
|
||||
"descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
|
||||
"units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit),
|
||||
"numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number),
|
||||
"aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation),
|
||||
"monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic),
|
||||
"temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()),
|
||||
)
|
||||
}
|
||||
|
||||
func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID {
|
||||
var zero N
|
||||
id := instrumentID{
|
||||
Name: vi.Name,
|
||||
Description: vi.Description,
|
||||
Unit: u,
|
||||
Aggregation: fmt.Sprintf("%T", vi.Aggregation),
|
||||
Temporality: i.pipeline.reader.temporality(vi.Kind),
|
||||
Number: fmt.Sprintf("%T", zero),
|
||||
}
|
||||
|
||||
switch vi.Kind {
|
||||
case view.AsyncCounter, view.SyncCounter, view.SyncHistogram:
|
||||
id.Monotonic = true
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
// aggregator returns a new Aggregator matching agg, temporality, and
|
||||
// monotonic. If the agg is unknown or temporality is invalid, an error is
|
||||
// returned.
|
||||
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
|
||||
switch a := agg.(type) {
|
||||
case aggregation.Drop:
|
||||
return nil, nil
|
||||
case aggregation.LastValue:
|
||||
return internal.NewLastValue[N](), nil
|
||||
case aggregation.Sum:
|
||||
if temporality == metricdata.CumulativeTemporality {
|
||||
switch temporality {
|
||||
case metricdata.CumulativeTemporality:
|
||||
return internal.NewCumulativeSum[N](monotonic), nil
|
||||
case metricdata.DeltaTemporality:
|
||||
return internal.NewDeltaSum[N](monotonic), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
|
||||
}
|
||||
return internal.NewDeltaSum[N](monotonic), nil
|
||||
case aggregation.ExplicitBucketHistogram:
|
||||
if temporality == metricdata.CumulativeTemporality {
|
||||
return internal.NewCumulativeHistogram[N](agg), nil
|
||||
switch temporality {
|
||||
case metricdata.CumulativeTemporality:
|
||||
return internal.NewCumulativeHistogram[N](a), nil
|
||||
case metricdata.DeltaTemporality:
|
||||
return internal.NewDeltaHistogram[N](a), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
|
||||
}
|
||||
return internal.NewDeltaHistogram[N](agg), nil
|
||||
}
|
||||
return nil, errUnknownAggregation
|
||||
}
|
||||
@@ -364,17 +429,17 @@ type resolver[N int64 | float64] struct {
|
||||
inserters []*inserter[N]
|
||||
}
|
||||
|
||||
func newResolver[N int64 | float64](p pipelines) *resolver[N] {
|
||||
func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] {
|
||||
in := make([]*inserter[N], len(p))
|
||||
for i := range in {
|
||||
in[i] = newInserter[N](p[i])
|
||||
in[i] = newInserter(p[i], c)
|
||||
}
|
||||
return &resolver[N]{in}
|
||||
return resolver[N]{in}
|
||||
}
|
||||
|
||||
// Aggregators returns the Aggregators instrument inst needs to update when it
|
||||
// makes a measurement.
|
||||
func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
|
||||
func (r resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
|
||||
var aggs []internal.Aggregator[N]
|
||||
|
||||
errs := &multierror{}
|
||||
|
||||
@@ -15,11 +15,15 @@
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/go-logr/logr/testr"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregation"
|
||||
@@ -211,7 +215,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
|
||||
}
|
||||
for _, tt := range testcases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
i := newInserter[N](newPipeline(nil, tt.reader, tt.views))
|
||||
c := newInstrumentCache[N](nil, nil)
|
||||
i := newInserter(newPipeline(nil, tt.reader, tt.views), c)
|
||||
got, err := i.Instrument(tt.inst, unit.Dimensionless)
|
||||
assert.ErrorIs(t, err, tt.wantErr)
|
||||
require.Len(t, got, tt.wantLen)
|
||||
@@ -223,7 +228,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
|
||||
}
|
||||
|
||||
func testInvalidInstrumentShouldPanic[N int64 | float64]() {
|
||||
i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}}))
|
||||
c := newInstrumentCache[N](nil, nil)
|
||||
i := newInserter(newPipeline(nil, NewManualReader(), []view.View{{}}), c)
|
||||
inst := view.Instrument{
|
||||
Name: "foo",
|
||||
Kind: view.InstrumentKind(255),
|
||||
@@ -334,7 +340,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
|
||||
func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
|
||||
inst := view.Instrument{Name: "foo", Kind: view.SyncCounter}
|
||||
|
||||
r := newResolver[int64](p)
|
||||
c := newInstrumentCache[int64](nil, nil)
|
||||
r := newResolver(p, c)
|
||||
aggs, err := r.Aggregators(inst, unit.Dimensionless)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -344,7 +351,8 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo
|
||||
func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
|
||||
inst := view.Instrument{Name: "foo", Kind: view.SyncCounter}
|
||||
|
||||
r := newResolver[float64](p)
|
||||
c := newInstrumentCache[float64](nil, nil)
|
||||
r := newResolver(p, c)
|
||||
aggs, err := r.Aggregators(inst, unit.Dimensionless)
|
||||
assert.NoError(t, err)
|
||||
|
||||
@@ -375,20 +383,40 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
|
||||
p := newPipelines(resource.Empty(), views)
|
||||
inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge}
|
||||
|
||||
ri := newResolver[int64](p)
|
||||
vc := cache[string, instrumentID]{}
|
||||
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
|
||||
intAggs, err := ri.Aggregators(inst, unit.Dimensionless)
|
||||
assert.Error(t, err)
|
||||
assert.Len(t, intAggs, 0)
|
||||
|
||||
p = newPipelines(resource.Empty(), views)
|
||||
|
||||
rf := newResolver[float64](p)
|
||||
rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
|
||||
floatAggs, err := rf.Aggregators(inst, unit.Dimensionless)
|
||||
assert.Error(t, err)
|
||||
assert.Len(t, floatAggs, 0)
|
||||
}
|
||||
|
||||
func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) {
|
||||
type logCounter struct {
|
||||
logr.LogSink
|
||||
|
||||
infoN uint32
|
||||
}
|
||||
|
||||
func (l *logCounter) Info(level int, msg string, keysAndValues ...interface{}) {
|
||||
atomic.AddUint32(&l.infoN, 1)
|
||||
l.LogSink.Info(level, msg, keysAndValues...)
|
||||
}
|
||||
|
||||
func (l *logCounter) InfoN() int {
|
||||
return int(atomic.SwapUint32(&l.infoN, 0))
|
||||
}
|
||||
|
||||
func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
|
||||
tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 6})
|
||||
l := &logCounter{LogSink: tLog.GetSink()}
|
||||
otel.SetLogger(logr.New(l))
|
||||
|
||||
renameView, _ := view.New(
|
||||
view.MatchInstrumentName("bar"),
|
||||
view.WithRename("foo"),
|
||||
@@ -405,29 +433,40 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) {
|
||||
|
||||
p := newPipelines(resource.Empty(), views)
|
||||
|
||||
ri := newResolver[int64](p)
|
||||
vc := cache[string, instrumentID]{}
|
||||
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
|
||||
intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, l.InfoN(), "no info logging should happen")
|
||||
assert.Len(t, intAggs, 1)
|
||||
|
||||
// The Rename view should error, because it creates a foo instrument.
|
||||
// The Rename view should produce the same instrument without an error, the
|
||||
// default view should also cause a new aggregator to be returned.
|
||||
intAggs, err = ri.Aggregators(barInst, unit.Dimensionless)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, l.InfoN(), "no info logging should happen")
|
||||
assert.Len(t, intAggs, 2)
|
||||
|
||||
// Creating a float foo instrument should error because there is an int foo instrument.
|
||||
rf := newResolver[float64](p)
|
||||
// Creating a float foo instrument should log a warning because there is an
|
||||
// int foo instrument.
|
||||
rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
|
||||
floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged")
|
||||
assert.Len(t, floatAggs, 1)
|
||||
|
||||
fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter}
|
||||
|
||||
_, err = rf.Aggregators(fooInst, unit.Dimensionless)
|
||||
floatAggs, err = rf.Aggregators(fooInst, unit.Dimensionless)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, l.InfoN(), "no info logging should happen")
|
||||
assert.Len(t, floatAggs, 1)
|
||||
|
||||
floatAggs, err = rf.Aggregators(barInst, unit.Dimensionless)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
// Both the rename and default view aggregators created above should now
|
||||
// conflict. Therefore, 2 warning messages should be logged.
|
||||
assert.Equal(t, 2, l.InfoN(), "instrument conflicts not logged")
|
||||
assert.Len(t, floatAggs, 2)
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@@ -49,8 +50,10 @@ func TestEmptyPipeline(t *testing.T) {
|
||||
assert.Nil(t, output.Resource)
|
||||
assert.Len(t, output.ScopeMetrics, 0)
|
||||
|
||||
err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
|
||||
assert.NoError(t, err)
|
||||
iSync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}}
|
||||
assert.NotPanics(t, func() {
|
||||
pipe.addSync(instrumentation.Scope{}, iSync)
|
||||
})
|
||||
|
||||
require.NotPanics(t, func() {
|
||||
pipe.addCallback(func(ctx context.Context) {})
|
||||
@@ -71,8 +74,10 @@ func TestNewPipeline(t *testing.T) {
|
||||
assert.Equal(t, resource.Empty(), output.Resource)
|
||||
assert.Len(t, output.ScopeMetrics, 0)
|
||||
|
||||
err = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
|
||||
assert.NoError(t, err)
|
||||
iSync := instrumentSync{"name", "desc", unit.Dimensionless, testSumAggregator{}}
|
||||
assert.NotPanics(t, func() {
|
||||
pipe.addSync(instrumentation.Scope{}, iSync)
|
||||
})
|
||||
|
||||
require.NotPanics(t, func() {
|
||||
pipe.addCallback(func(ctx context.Context) {})
|
||||
@@ -85,99 +90,6 @@ func TestNewPipeline(t *testing.T) {
|
||||
require.Len(t, output.ScopeMetrics[0].Metrics, 1)
|
||||
}
|
||||
|
||||
func TestPipelineDuplicateRegistration(t *testing.T) {
|
||||
type instrumentID struct {
|
||||
scope instrumentation.Scope
|
||||
name string
|
||||
description string
|
||||
unit unit.Unit
|
||||
}
|
||||
testCases := []struct {
|
||||
name string
|
||||
secondInst instrumentID
|
||||
want error
|
||||
wantScopeLen int
|
||||
wantMetricsLen int
|
||||
}{
|
||||
{
|
||||
name: "exact should error",
|
||||
secondInst: instrumentID{
|
||||
scope: instrumentation.Scope{},
|
||||
name: "name",
|
||||
description: "desc",
|
||||
unit: unit.Dimensionless,
|
||||
},
|
||||
want: errAlreadyRegistered,
|
||||
wantScopeLen: 1,
|
||||
wantMetricsLen: 1,
|
||||
},
|
||||
{
|
||||
name: "description should not be identifying",
|
||||
secondInst: instrumentID{
|
||||
scope: instrumentation.Scope{},
|
||||
name: "name",
|
||||
description: "other desc",
|
||||
unit: unit.Dimensionless,
|
||||
},
|
||||
want: errAlreadyRegistered,
|
||||
wantScopeLen: 1,
|
||||
wantMetricsLen: 1,
|
||||
},
|
||||
{
|
||||
name: "scope should be identifying",
|
||||
secondInst: instrumentID{
|
||||
scope: instrumentation.Scope{
|
||||
Name: "newScope",
|
||||
},
|
||||
name: "name",
|
||||
description: "desc",
|
||||
unit: unit.Dimensionless,
|
||||
},
|
||||
wantScopeLen: 2,
|
||||
wantMetricsLen: 1,
|
||||
},
|
||||
{
|
||||
name: "name should be identifying",
|
||||
secondInst: instrumentID{
|
||||
scope: instrumentation.Scope{},
|
||||
name: "newName",
|
||||
description: "desc",
|
||||
unit: unit.Dimensionless,
|
||||
},
|
||||
wantScopeLen: 1,
|
||||
wantMetricsLen: 2,
|
||||
},
|
||||
{
|
||||
name: "unit should be identifying",
|
||||
secondInst: instrumentID{
|
||||
scope: instrumentation.Scope{},
|
||||
name: "name",
|
||||
description: "desc",
|
||||
unit: unit.Bytes,
|
||||
},
|
||||
wantScopeLen: 1,
|
||||
wantMetricsLen: 2,
|
||||
},
|
||||
}
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
pipe := newPipeline(nil, nil, nil)
|
||||
err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = pipe.addAggregator(tt.secondInst.scope, tt.secondInst.name, tt.secondInst.description, tt.secondInst.unit, testSumAggregator{})
|
||||
assert.ErrorIs(t, err, tt.want)
|
||||
|
||||
if tt.wantScopeLen > 0 {
|
||||
output, err := pipe.produce(context.Background())
|
||||
assert.NoError(t, err)
|
||||
require.Len(t, output.ScopeMetrics, tt.wantScopeLen)
|
||||
require.Len(t, output.ScopeMetrics[0].Metrics, tt.wantMetricsLen)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelineUsesResource(t *testing.T) {
|
||||
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
|
||||
pipe := newPipeline(res, nil, nil)
|
||||
@@ -201,10 +113,12 @@ func TestPipelineConcurrency(t *testing.T) {
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
go func(n int) {
|
||||
defer wg.Done()
|
||||
_ = pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{})
|
||||
}()
|
||||
name := fmt.Sprintf("name %d", n)
|
||||
sync := instrumentSync{name, "desc", unit.Dimensionless, testSumAggregator{}}
|
||||
pipe.addSync(instrumentation.Scope{}, sync)
|
||||
}(i)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@@ -249,7 +163,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
i := newInserter[N](test.pipe)
|
||||
c := newInstrumentCache[N](nil, nil)
|
||||
i := newInserter(test.pipe, c)
|
||||
got, err := i.Instrument(inst, unit.Dimensionless)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, got, 1, "default view not applied")
|
||||
|
||||
Reference in New Issue
Block a user