mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-05 13:15:41 +02:00
Associate views with MeterProvider instead of Reader (#3387)
* Split WithView from WithReader * Accept readers and views params in newPipelines * Update MeterProvider pipes init * Fix WithView comment * Fix view example MeterProvider option * Fix With{View,Reader} option in prom exporter test * Test Reader not required to be comparable * Add changes to changelog * Fix changelog option name
This commit is contained in:
parent
c8a13d6302
commit
d1aca7b167
12
CHANGELOG.md
12
CHANGELOG.md
@ -8,9 +8,21 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- The `WithView` `Option` is added to the `go.opentelemetry.io/otel/sdk/metric` package.
|
||||
This option is used to configure the view(s) a `MeterProvider` will use for all `Reader`s that are registered with it. (#3387)
|
||||
|
||||
### Changed
|
||||
|
||||
- The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`.
|
||||
Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option.
|
||||
The views registered with the `MeterProvider` apply to all `Reader`s. (#3387)
|
||||
|
||||
### Fixed
|
||||
|
||||
- The `go.opentelemetry.io/otel/exporters/prometheus` exporter fixes duplicated `_total` suffixes. (#3369)
|
||||
- Remove comparable requirement for `Reader`s. (#3387)
|
||||
- Cumulative metrics from the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) are defined as monotonic sums, instead of non-monotonic. (#3389)
|
||||
- Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398)
|
||||
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
|
||||
|
@ -66,7 +66,10 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
provider := metric.NewMeterProvider(metric.WithReader(exporter, customBucketsView, defaultView))
|
||||
provider := metric.NewMeterProvider(
|
||||
metric.WithReader(exporter),
|
||||
metric.WithView(customBucketsView, defaultView),
|
||||
)
|
||||
meter := provider.Meter(meterName)
|
||||
|
||||
// Start the prometheus HTTP server and pass the exporter Collector to it
|
||||
|
@ -260,7 +260,8 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
|
||||
provider := metric.NewMeterProvider(
|
||||
metric.WithResource(res),
|
||||
metric.WithReader(exporter, customBucketsView, defaultView),
|
||||
metric.WithReader(exporter),
|
||||
metric.WithView(customBucketsView, defaultView),
|
||||
)
|
||||
meter := provider.Meter("testmeter")
|
||||
|
||||
|
@ -26,7 +26,8 @@ import (
|
||||
// config contains configuration options for a MeterProvider.
|
||||
type config struct {
|
||||
res *resource.Resource
|
||||
readers map[Reader][]view.View
|
||||
readers []Reader
|
||||
views []view.View
|
||||
}
|
||||
|
||||
// readerSignals returns a force-flush and shutdown function for a
|
||||
@ -35,7 +36,7 @@ type config struct {
|
||||
// single functions.
|
||||
func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
|
||||
var fFuncs, sFuncs []func(context.Context) error
|
||||
for r := range c.readers {
|
||||
for _, r := range c.readers {
|
||||
sFuncs = append(sFuncs, r.Shutdown)
|
||||
fFuncs = append(fFuncs, r.ForceFlush)
|
||||
}
|
||||
@ -112,21 +113,30 @@ func WithResource(res *resource.Resource) Option {
|
||||
})
|
||||
}
|
||||
|
||||
// WithReader associates a Reader with a MeterProvider. Any passed view config
|
||||
// will be used to associate a view with the Reader. If no views are passed
|
||||
// the default view will be use for the Reader.
|
||||
//
|
||||
// Passing this option multiple times for the same Reader will overwrite. The
|
||||
// last option passed will be the one used for that Reader.
|
||||
// WithReader associates Reader r with a MeterProvider.
|
||||
//
|
||||
// By default, if this option is not used, the MeterProvider will perform no
|
||||
// operations; no data will be exported without a Reader.
|
||||
func WithReader(r Reader, views ...view.View) Option {
|
||||
func WithReader(r Reader) Option {
|
||||
return optionFunc(func(cfg config) config {
|
||||
if cfg.readers == nil {
|
||||
cfg.readers = make(map[Reader][]view.View)
|
||||
if r == nil {
|
||||
return cfg
|
||||
}
|
||||
cfg.readers[r] = views
|
||||
cfg.readers = append(cfg.readers, r)
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
||||
// WithView associates views a MeterProvider.
|
||||
//
|
||||
// Views are appended to existing ones in a MeterProvider if this option is
|
||||
// used multiple times.
|
||||
//
|
||||
// By default, if this option is not used, the MeterProvider will use the
|
||||
// default view.
|
||||
func WithView(views ...view.View) Option {
|
||||
return optionFunc(func(cfg config) config {
|
||||
cfg.views = append(cfg.views, views...)
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
@ -127,5 +127,21 @@ func TestWithResource(t *testing.T) {
|
||||
func TestWithReader(t *testing.T) {
|
||||
r := &reader{}
|
||||
c := newConfig([]Option{WithReader(r)})
|
||||
assert.Contains(t, c.readers, r)
|
||||
require.Len(t, c.readers, 1)
|
||||
assert.Same(t, r, c.readers[0])
|
||||
}
|
||||
|
||||
func TestWithView(t *testing.T) {
|
||||
var views []view.View
|
||||
|
||||
v, err := view.New(view.MatchInstrumentKind(view.AsyncCounter), view.WithRename("a"))
|
||||
require.NoError(t, err)
|
||||
views = append(views, v)
|
||||
|
||||
v, err = view.New(view.MatchInstrumentKind(view.SyncCounter), view.WithRename("b"))
|
||||
require.NoError(t, err)
|
||||
views = append(views, v)
|
||||
|
||||
c := newConfig([]Option{WithView(views...)})
|
||||
assert.Equal(t, views, c.views)
|
||||
}
|
||||
|
@ -416,13 +416,13 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio
|
||||
// measurement.
|
||||
type pipelines []*pipeline
|
||||
|
||||
func newPipelines(res *resource.Resource, readers map[Reader][]view.View) pipelines {
|
||||
func newPipelines(res *resource.Resource, readers []Reader, views []view.View) pipelines {
|
||||
pipes := make([]*pipeline, 0, len(readers))
|
||||
for r, v := range readers {
|
||||
for _, r := range readers {
|
||||
p := &pipeline{
|
||||
resource: res,
|
||||
reader: r,
|
||||
views: v,
|
||||
views: views,
|
||||
}
|
||||
r.register(p)
|
||||
pipes = append(pipes, p)
|
||||
|
@ -257,7 +257,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
views map[Reader][]view.View
|
||||
readers []Reader
|
||||
views []view.View
|
||||
inst view.Instrument
|
||||
wantCount int
|
||||
}{
|
||||
@ -266,72 +267,46 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
},
|
||||
{
|
||||
name: "1 reader 1 view gets 1 aggregator",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
views: map[Reader][]view.View{
|
||||
testRdr: {
|
||||
{},
|
||||
},
|
||||
},
|
||||
name: "1 reader 1 view gets 1 aggregator",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
readers: []Reader{testRdr},
|
||||
views: []view.View{{}},
|
||||
wantCount: 1,
|
||||
},
|
||||
{
|
||||
name: "1 reader 2 views gets 2 aggregator",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
views: map[Reader][]view.View{
|
||||
testRdr: {
|
||||
{},
|
||||
renameView,
|
||||
},
|
||||
},
|
||||
name: "1 reader 2 views gets 2 aggregator",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
readers: []Reader{testRdr},
|
||||
views: []view.View{{}, renameView},
|
||||
wantCount: 2,
|
||||
},
|
||||
{
|
||||
name: "2 readers 1 view each gets 2 aggregators",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
views: map[Reader][]view.View{
|
||||
testRdr: {
|
||||
{},
|
||||
},
|
||||
testRdrHistogram: {
|
||||
{},
|
||||
},
|
||||
},
|
||||
name: "2 readers 1 view each gets 2 aggregators",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
readers: []Reader{testRdr, testRdrHistogram},
|
||||
views: []view.View{{}},
|
||||
wantCount: 2,
|
||||
},
|
||||
{
|
||||
name: "2 reader 2 views each gets 4 aggregators",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
views: map[Reader][]view.View{
|
||||
testRdr: {
|
||||
{},
|
||||
renameView,
|
||||
},
|
||||
testRdrHistogram: {
|
||||
{},
|
||||
renameView,
|
||||
},
|
||||
},
|
||||
name: "2 reader 2 views each gets 4 aggregators",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
readers: []Reader{testRdr, testRdrHistogram},
|
||||
views: []view.View{{}, renameView},
|
||||
wantCount: 4,
|
||||
},
|
||||
{
|
||||
name: "An instrument is duplicated in two views share the same aggregator",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
views: map[Reader][]view.View{
|
||||
testRdr: {
|
||||
{},
|
||||
{},
|
||||
},
|
||||
},
|
||||
name: "An instrument is duplicated in two views share the same aggregator",
|
||||
inst: view.Instrument{Name: "foo"},
|
||||
readers: []Reader{testRdr},
|
||||
views: []view.View{{}, {}},
|
||||
wantCount: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
p := newPipelines(resource.Empty(), tt.views)
|
||||
p := newPipelines(resource.Empty(), tt.readers, tt.views)
|
||||
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
|
||||
p = newPipelines(resource.Empty(), tt.views)
|
||||
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
|
||||
})
|
||||
}
|
||||
@ -362,11 +337,10 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want
|
||||
func TestPipelineRegistryResource(t *testing.T) {
|
||||
v, err := view.New(view.MatchInstrumentName("bar"), view.WithRename("foo"))
|
||||
require.NoError(t, err)
|
||||
views := map[Reader][]view.View{
|
||||
NewManualReader(): {{}, v},
|
||||
}
|
||||
readers := []Reader{NewManualReader()}
|
||||
views := []view.View{{}, v}
|
||||
res := resource.NewSchemaless(attribute.String("key", "val"))
|
||||
pipes := newPipelines(res, views)
|
||||
pipes := newPipelines(res, readers, views)
|
||||
for _, p := range pipes {
|
||||
assert.True(t, res.Equal(p.resource), "resource not set")
|
||||
}
|
||||
@ -375,12 +349,9 @@ func TestPipelineRegistryResource(t *testing.T) {
|
||||
func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
|
||||
testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} }))
|
||||
|
||||
views := map[Reader][]view.View{
|
||||
testRdrHistogram: {
|
||||
{},
|
||||
},
|
||||
}
|
||||
p := newPipelines(resource.Empty(), views)
|
||||
readers := []Reader{testRdrHistogram}
|
||||
views := []view.View{{}}
|
||||
p := newPipelines(resource.Empty(), readers, views)
|
||||
inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge}
|
||||
|
||||
vc := cache[string, instrumentID]{}
|
||||
@ -389,8 +360,6 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
assert.Len(t, intAggs, 0)
|
||||
|
||||
p = newPipelines(resource.Empty(), views)
|
||||
|
||||
rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
|
||||
floatAggs, err := rf.Aggregators(inst, unit.Dimensionless)
|
||||
assert.Error(t, err)
|
||||
@ -421,17 +390,13 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
|
||||
view.MatchInstrumentName("bar"),
|
||||
view.WithRename("foo"),
|
||||
)
|
||||
views := map[Reader][]view.View{
|
||||
NewManualReader(): {
|
||||
{},
|
||||
renameView,
|
||||
},
|
||||
}
|
||||
readers := []Reader{NewManualReader()}
|
||||
views := []view.View{{}, renameView}
|
||||
|
||||
fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter}
|
||||
barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter}
|
||||
|
||||
p := newPipelines(resource.Empty(), views)
|
||||
p := newPipelines(resource.Empty(), readers, views)
|
||||
|
||||
vc := cache[string, instrumentID]{}
|
||||
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
|
||||
|
@ -45,7 +45,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {
|
||||
conf := newConfig(options)
|
||||
flush, sdown := conf.readerSignals()
|
||||
return &MeterProvider{
|
||||
pipes: newPipelines(conf.res, conf.readers),
|
||||
pipes: newPipelines(conf.res, conf.readers, conf.views),
|
||||
forceFlush: flush,
|
||||
shutdown: sdown,
|
||||
}
|
||||
|
@ -236,3 +236,15 @@ func TestDefaultTemporalitySelector(t *testing.T) {
|
||||
assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik))
|
||||
}
|
||||
}
|
||||
|
||||
type notComparable [0]func() // nolint:unused // non-comparable type itself is used.
|
||||
|
||||
type noCompareReader struct {
|
||||
notComparable // nolint:unused // non-comparable type itself is used.
|
||||
Reader
|
||||
}
|
||||
|
||||
func TestReadersNotRequiredToBeComparable(t *testing.T) {
|
||||
r := noCompareReader{Reader: NewManualReader()}
|
||||
assert.NotPanics(t, func() { _ = NewMeterProvider(WithReader(r)) })
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user