1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-30 04:40:41 +02:00

Merge instrument cache to inserter (#3724)

* Merge instrument cache to inserter

The current pipeline resolution path will only add the resolved
aggregators to the pipeline when it creates one (cache miss). It will
not add it if there is a cache hit. This means (since we cache
instruments at the meter level, not the pipeline level) the first reader
in a multiple-reader setup is the only one that will collect data for
that aggregator. All other readers will have a cache hit and nothing is
added to the pipeline. This is causing #3720.

This resolves #3720 by moving the instrument caching into the inserter.
This means aggregators are cached at the reader level, not the meter.

* Rename aggCV to aggVal

---------

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
This commit is contained in:
Tyler Yahn 2023-02-21 08:07:37 -08:00 committed by GitHub
parent cc8bdaaad4
commit f78f72d66c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 83 additions and 95 deletions

View File

@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed
- Ensure `go.opentelemetry.io/otel` does not use generics. (#3723, #3725)
- Multi-reader `MeterProvider`s now export metrics for all readers, instead of just the first reader. (#3720, #3724)
- Remove use of deprecated `"math/rand".Seed` in `go.opentelemetry.io/otel/example/prometheus`. (#3733)
## [1.13.0/0.36.0] 2023-02-07

View File

@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"sync"
"go.opentelemetry.io/otel/sdk/metric/internal"
)
// cache is a locking storage used to quickly return already computed values.
@ -54,57 +52,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}
// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[streamID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, streamID]
}
// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[streamID, aggVal[N]], vc *cache[string, streamID]) instrumentCache[N] {
if ac == nil {
ac = &cache[streamID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, streamID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}
// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id streamID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}
// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}
// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that streamID will be returned
// along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id streamID) (streamID, bool) {
got := c.views.Lookup(id.Name, func() streamID { return id })
return got, id == got
}

View File

@ -45,16 +45,11 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]
// Passing nil as the ac parameter to newInstrumentCache will have each
// create its own aggregator cache.
ic := newInstrumentCache[int64](nil, &viewCache)
fc := newInstrumentCache[float64](nil, &viewCache)
return &meter{
scope: s,
pipes: p,
int64IP: newInstProvider(s, p, ic),
float64IP: newInstProvider(s, p, fc),
int64IP: newInstProvider[int64](s, p, &viewCache),
float64IP: newInstProvider[float64](s, p, &viewCache),
}
}
@ -375,8 +370,8 @@ type instProvider[N int64 | float64] struct {
resolve resolver[N]
}
func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)}
func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver[N](p, c)}
}
func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc string, u unit.Unit) ([]internal.Aggregator[N], error) {

View File

@ -179,12 +179,32 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
// inserter facilitates inserting of new instruments from a single scope into a
// pipeline.
type inserter[N int64 | float64] struct {
cache instrumentCache[N]
// aggregators is a cache that holds Aggregators inserted into the
// underlying reader pipeline. This cache ensures no duplicate Aggregators
// are inserted into the reader pipeline and if a new request during an
// instrument creation asks for the same Aggregator the same instance is
// returned.
aggregators *cache[streamID, aggVal[N]]
// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, streamID]
pipeline *pipeline
}
func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] {
return &inserter[N]{cache: c, pipeline: p}
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] {
if vc == nil {
vc = &cache[string, streamID]{}
}
return &inserter[N]{
aggregators: &cache[streamID, aggVal[N]]{},
views: vc,
pipeline: p,
}
}
// Instrument inserts the instrument inst with instUnit into a pipeline. All
@ -261,6 +281,12 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err
return aggs, errs.errorOrNil()
}
// aggVal is the cached value in an aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}
// cachedAggregator returns the appropriate Aggregator for an instrument
// configuration. If the exact instrument has been created within the
// inst.Scope, that Aggregator instance will be returned. Otherwise, a new
@ -292,13 +318,13 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
cv := i.aggregators.Lookup(id, func() aggVal[N] {
agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic)
if err != nil {
return nil, err
return aggVal[N]{nil, err}
}
if agg == nil { // Drop aggregator.
return nil, nil
return aggVal[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
@ -310,15 +336,16 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
unit: stream.Unit,
aggregator: agg,
})
return agg, err
return aggVal[N]{agg, err}
})
return cv.Aggregator, cv.Err
}
// logConflict validates if an instrument with the same name as id has already
// been created. If that instrument conflicts with id, a warning is logged.
func (i *inserter[N]) logConflict(id streamID) {
existing, unique := i.cache.Unique(id)
if unique {
existing := i.views.Lookup(id.Name, func() streamID { return id })
if id == existing {
return
}
@ -491,10 +518,10 @@ type resolver[N int64 | float64] struct {
inserters []*inserter[N]
}
func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] {
func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter(p[i], c)
in[i] = newInserter[N](p[i], vc)
}
return resolver[N]{in}
}

View File

@ -215,8 +215,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
c := newInstrumentCache[N](nil, nil)
i := newInserter(newPipeline(nil, tt.reader, tt.views), c)
var c cache[string, streamID]
i := newInserter[N](newPipeline(nil, tt.reader, tt.views), &c)
got, err := i.Instrument(tt.inst)
assert.ErrorIs(t, err, tt.wantErr)
require.Len(t, got, tt.wantLen)
@ -227,9 +227,14 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
}
func TestCreateAggregators(t *testing.T) {
t.Run("Int64", testCreateAggregators[int64])
t.Run("Float64", testCreateAggregators[float64])
}
func testInvalidInstrumentShouldPanic[N int64 | float64]() {
c := newInstrumentCache[N](nil, nil)
i := newInserter(newPipeline(nil, NewManualReader(), []View{defaultView}), c)
var c cache[string, streamID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
inst := Instrument{
Name: "foo",
Kind: InstrumentKind(255),
@ -242,9 +247,25 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) {
assert.Panics(t, testInvalidInstrumentShouldPanic[float64])
}
func TestCreateAggregators(t *testing.T) {
t.Run("Int64", testCreateAggregators[int64])
t.Run("Float64", testCreateAggregators[float64])
func TestPipelinesAggregatorForEachReader(t *testing.T) {
r0, r1 := NewManualReader(), NewManualReader()
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil)
require.Len(t, pipes, 2, "created pipelines")
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
r := newResolver[int64](pipes, &c)
aggs, err := r.Aggregators(inst)
require.NoError(t, err, "resolved Aggregators error")
require.Len(t, aggs, 2, "instrument aggregators")
for i, p := range pipes {
var aggN int
for _, is := range p.aggregations {
aggN += len(is)
}
assert.Equalf(t, 1, aggN, "pipeline %d: number of instrumentSync", i)
}
}
func TestPipelineRegistryCreateAggregators(t *testing.T) {
@ -309,8 +330,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
c := newInstrumentCache[int64](nil, nil)
r := newResolver(p, c)
var c cache[string, streamID]
r := newResolver[int64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
@ -319,8 +340,8 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo
func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
c := newInstrumentCache[float64](nil, nil)
r := newResolver(p, c)
var c cache[string, streamID]
r := newResolver[float64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
@ -346,13 +367,13 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}
vc := cache[string, streamID]{}
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
var vc cache[string, streamID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(inst)
assert.Error(t, err)
assert.Len(t, intAggs, 0)
rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
rf := newResolver[float64](p, &vc)
floatAggs, err := rf.Aggregators(inst)
assert.Error(t, err)
assert.Len(t, floatAggs, 0)
@ -397,8 +418,8 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
vc := cache[string, streamID]{}
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
var vc cache[string, streamID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(fooInst)
assert.NoError(t, err)
assert.Equal(t, 0, l.InfoN(), "no info logging should happen")
@ -413,7 +434,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
// Creating a float foo instrument should log a warning because there is an
// int foo instrument.
rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
rf := newResolver[float64](p, &vc)
floatAggs, err := rf.Aggregators(fooInst)
assert.NoError(t, err)
assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged")

View File

@ -159,8 +159,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := newInstrumentCache[N](nil, nil)
i := newInserter(test.pipe, c)
var c cache[string, streamID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")