diff --git a/CHANGELOG.md b/CHANGELOG.md index 10ac79452..7bee3f062 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed - Ensure `go.opentelemetry.io/otel` does not use generics. (#3723, #3725) +- Multi-reader `MeterProvider`s now export metrics for all readers, instead of just the first reader. (#3720, #3724) - Remove use of deprecated `"math/rand".Seed` in `go.opentelemetry.io/otel/example/prometheus`. (#3733) ## [1.13.0/0.36.0] 2023-02-07 diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index 70e293767..110e49005 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "sync" - - "go.opentelemetry.io/otel/sdk/metric/internal" ) // cache is a locking storage used to quickly return already computed values. @@ -54,57 +52,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V { c.data[key] = val return val } - -// instrumentCache is a cache of instruments. It is scoped at the Meter level -// along with a number type. Meaning all instruments it contains need to belong -// to the same instrumentation.Scope (implicitly) and number type (explicitly). -type instrumentCache[N int64 | float64] struct { - // aggregators is used to ensure duplicate creations of the same instrument - // return the same instance of that instrument's aggregator. - aggregators *cache[streamID, aggVal[N]] - // views is used to ensure if instruments with the same name are created, - // but do not have the same identifying properties, a warning is logged. - views *cache[string, streamID] -} - -// newInstrumentCache returns a new instrumentCache that uses ac as the -// underlying cache for aggregators and vc as the cache for views. If ac or vc -// are nil, a new empty cache will be used. -func newInstrumentCache[N int64 | float64](ac *cache[streamID, aggVal[N]], vc *cache[string, streamID]) instrumentCache[N] { - if ac == nil { - ac = &cache[streamID, aggVal[N]]{} - } - if vc == nil { - vc = &cache[string, streamID]{} - } - return instrumentCache[N]{aggregators: ac, views: vc} -} - -// LookupAggregator returns the Aggregator and error for a cached instrument if -// it exist in the cache. Otherwise, f is called and its returned value is set -// in the cache and returned. -// -// LookupAggregator is safe to call concurrently. -func (c instrumentCache[N]) LookupAggregator(id streamID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) { - v := c.aggregators.Lookup(id, func() aggVal[N] { - a, err := f() - return aggVal[N]{Aggregator: a, Err: err} - }) - return v.Aggregator, v.Err -} - -// aggVal is the cached value of an instrumentCache's aggregators cache. -type aggVal[N int64 | float64] struct { - Aggregator internal.Aggregator[N] - Err error -} - -// Unique returns if id is unique or a duplicate instrument. If an instrument -// with the same name has already been created, that streamID will be returned -// along with false. Otherwise, id is returned with true. -// -// Unique is safe to call concurrently. -func (c instrumentCache[N]) Unique(id streamID) (streamID, bool) { - got := c.views.Lookup(id.Name, func() streamID { return id }) - return got, id == got -} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 2863cff11..b8d290e70 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -45,16 +45,11 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { // meter is asked to create are logged to the user. var viewCache cache[string, streamID] - // Passing nil as the ac parameter to newInstrumentCache will have each - // create its own aggregator cache. - ic := newInstrumentCache[int64](nil, &viewCache) - fc := newInstrumentCache[float64](nil, &viewCache) - return &meter{ scope: s, pipes: p, - int64IP: newInstProvider(s, p, ic), - float64IP: newInstProvider(s, p, fc), + int64IP: newInstProvider[int64](s, p, &viewCache), + float64IP: newInstProvider[float64](s, p, &viewCache), } } @@ -375,8 +370,8 @@ type instProvider[N int64 | float64] struct { resolve resolver[N] } -func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] { - return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)} +func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *instProvider[N] { + return &instProvider[N]{scope: s, pipes: p, resolve: newResolver[N](p, c)} } func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc string, u unit.Unit) ([]internal.Aggregator[N], error) { diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index ab836b2ef..c58c113e2 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -179,12 +179,32 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // inserter facilitates inserting of new instruments from a single scope into a // pipeline. type inserter[N int64 | float64] struct { - cache instrumentCache[N] + // aggregators is a cache that holds Aggregators inserted into the + // underlying reader pipeline. This cache ensures no duplicate Aggregators + // are inserted into the reader pipeline and if a new request during an + // instrument creation asks for the same Aggregator the same instance is + // returned. + aggregators *cache[streamID, aggVal[N]] + + // views is a cache that holds instrument identifiers for all the + // instruments a Meter has created, it is provided from the Meter that owns + // this inserter. This cache ensures during the creation of instruments + // with the same name but different options (e.g. description, unit) a + // warning message is logged. + views *cache[string, streamID] + pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] { - return &inserter[N]{cache: c, pipeline: p} +func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] { + if vc == nil { + vc = &cache[string, streamID]{} + } + return &inserter[N]{ + aggregators: &cache[streamID, aggVal[N]]{}, + views: vc, + pipeline: p, + } } // Instrument inserts the instrument inst with instUnit into a pipeline. All @@ -261,6 +281,12 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err return aggs, errs.errorOrNil() } +// aggVal is the cached value in an aggregators cache. +type aggVal[N int64 | float64] struct { + Aggregator internal.Aggregator[N] + Err error +} + // cachedAggregator returns the appropriate Aggregator for an instrument // configuration. If the exact instrument has been created within the // inst.Scope, that Aggregator instance will be returned. Otherwise, a new @@ -292,13 +318,13 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // If there is a conflict, the specification says the view should // still be applied and a warning should be logged. i.logConflict(id) - return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) { + cv := i.aggregators.Lookup(id, func() aggVal[N] { agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic) if err != nil { - return nil, err + return aggVal[N]{nil, err} } if agg == nil { // Drop aggregator. - return nil, nil + return aggVal[N]{nil, nil} } if stream.AttributeFilter != nil { agg = internal.NewFilter(agg, stream.AttributeFilter) @@ -310,15 +336,16 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum unit: stream.Unit, aggregator: agg, }) - return agg, err + return aggVal[N]{agg, err} }) + return cv.Aggregator, cv.Err } // logConflict validates if an instrument with the same name as id has already // been created. If that instrument conflicts with id, a warning is logged. func (i *inserter[N]) logConflict(id streamID) { - existing, unique := i.cache.Unique(id) - if unique { + existing := i.views.Lookup(id.Name, func() streamID { return id }) + if id == existing { return } @@ -491,10 +518,10 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] { +func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter(p[i], c) + in[i] = newInserter[N](p[i], vc) } return resolver[N]{in} } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 823ad72d2..d3e03dca5 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -215,8 +215,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - c := newInstrumentCache[N](nil, nil) - i := newInserter(newPipeline(nil, tt.reader, tt.views), c) + var c cache[string, streamID] + i := newInserter[N](newPipeline(nil, tt.reader, tt.views), &c) got, err := i.Instrument(tt.inst) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) @@ -227,9 +227,14 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } } +func TestCreateAggregators(t *testing.T) { + t.Run("Int64", testCreateAggregators[int64]) + t.Run("Float64", testCreateAggregators[float64]) +} + func testInvalidInstrumentShouldPanic[N int64 | float64]() { - c := newInstrumentCache[N](nil, nil) - i := newInserter(newPipeline(nil, NewManualReader(), []View{defaultView}), c) + var c cache[string, streamID] + i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c) inst := Instrument{ Name: "foo", Kind: InstrumentKind(255), @@ -242,9 +247,25 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) { assert.Panics(t, testInvalidInstrumentShouldPanic[float64]) } -func TestCreateAggregators(t *testing.T) { - t.Run("Int64", testCreateAggregators[int64]) - t.Run("Float64", testCreateAggregators[float64]) +func TestPipelinesAggregatorForEachReader(t *testing.T) { + r0, r1 := NewManualReader(), NewManualReader() + pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil) + require.Len(t, pipes, 2, "created pipelines") + + inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} + var c cache[string, streamID] + r := newResolver[int64](pipes, &c) + aggs, err := r.Aggregators(inst) + require.NoError(t, err, "resolved Aggregators error") + require.Len(t, aggs, 2, "instrument aggregators") + + for i, p := range pipes { + var aggN int + for _, is := range p.aggregations { + aggN += len(is) + } + assert.Equalf(t, 1, aggN, "pipeline %d: number of instrumentSync", i) + } } func TestPipelineRegistryCreateAggregators(t *testing.T) { @@ -309,8 +330,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - c := newInstrumentCache[int64](nil, nil) - r := newResolver(p, c) + var c cache[string, streamID] + r := newResolver[int64](p, &c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -319,8 +340,8 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - c := newInstrumentCache[float64](nil, nil) - r := newResolver(p, c) + var c cache[string, streamID] + r := newResolver[float64](p, &c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -346,13 +367,13 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), readers, views) inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge} - vc := cache[string, streamID]{} - ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) + var vc cache[string, streamID] + ri := newResolver[int64](p, &vc) intAggs, err := ri.Aggregators(inst) assert.Error(t, err) assert.Len(t, intAggs, 0) - rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) + rf := newResolver[float64](p, &vc) floatAggs, err := rf.Aggregators(inst) assert.Error(t, err) assert.Len(t, floatAggs, 0) @@ -397,8 +418,8 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), readers, views) - vc := cache[string, streamID]{} - ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) + var vc cache[string, streamID] + ri := newResolver[int64](p, &vc) intAggs, err := ri.Aggregators(fooInst) assert.NoError(t, err) assert.Equal(t, 0, l.InfoN(), "no info logging should happen") @@ -413,7 +434,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { // Creating a float foo instrument should log a warning because there is an // int foo instrument. - rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) + rf := newResolver[float64](p, &vc) floatAggs, err := rf.Aggregators(fooInst) assert.NoError(t, err) assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged") diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 3c5e19b8b..ded48ac16 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -159,8 +159,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - c := newInstrumentCache[N](nil, nil) - i := newInserter(test.pipe, c) + var c cache[string, streamID] + i := newInserter[N](test.pipe, &c) got, err := i.Instrument(inst) require.NoError(t, err) assert.Len(t, got, 1, "default view not applied")