You've already forked opentelemetry-go
							
							
				mirror of
				https://github.com/open-telemetry/opentelemetry-go.git
				synced 2025-10-31 00:07:40 +02:00 
			
		
		
		
	Restructure instrument creation code paths (#3256)
* Add BenchmarkInstrumentCreation * Unify instrument provider * Resolve import shadow * Update sdk/metric/pipeline.go Co-authored-by: Anthony Mirabella <a9@aneurysm9.com> * Punctuate to fix lint Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com> Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
This commit is contained in:
		| @@ -22,251 +22,140 @@ import ( | ||||
| 	"go.opentelemetry.io/otel/metric/instrument/asyncint64" | ||||
| 	"go.opentelemetry.io/otel/metric/instrument/syncfloat64" | ||||
| 	"go.opentelemetry.io/otel/metric/instrument/syncint64" | ||||
| 	"go.opentelemetry.io/otel/metric/unit" | ||||
| 	"go.opentelemetry.io/otel/sdk/instrumentation" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/view" | ||||
| ) | ||||
|  | ||||
| // instProviderKey uniquely describes an instrument creation request received | ||||
| // by an instrument provider. | ||||
| type instProviderKey struct { | ||||
| 	// Name is the name of the instrument. | ||||
| 	Name string | ||||
| 	// Description is the description of the instrument. | ||||
| 	Description string | ||||
| 	// Unit is the unit of the instrument. | ||||
| 	Unit unit.Unit | ||||
| 	// Kind is the instrument Kind provided. | ||||
| 	Kind view.InstrumentKind | ||||
| } | ||||
|  | ||||
| // viewInst returns the instProviderKey as a view Instrument using scope s. | ||||
| func (k instProviderKey) viewInst(s instrumentation.Scope) view.Instrument { | ||||
| 	return view.Instrument{ | ||||
| 		Scope:       s, | ||||
| 		Name:        k.Name, | ||||
| 		Description: k.Description, | ||||
| 		Kind:        k.Kind, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // instProvider provides all OpenTelemetry instruments. | ||||
| type instProvider[N int64 | float64] struct { | ||||
| 	resolve resolver[N] | ||||
| } | ||||
|  | ||||
| func newInstProvider[N int64 | float64](r resolver[N]) *instProvider[N] { | ||||
| 	return &instProvider[N]{resolve: r} | ||||
| } | ||||
|  | ||||
| // lookup returns the resolved instrumentImpl. | ||||
| func (p *instProvider[N]) lookup(kind view.InstrumentKind, name string, opts []instrument.Option) (*instrumentImpl[N], error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
| 	key := instProviderKey{ | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Unit:        cfg.Unit(), | ||||
| 		Kind:        kind, | ||||
| 	} | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(key) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[N]{aggregators: aggs}, err | ||||
| } | ||||
|  | ||||
| type asyncInt64Provider struct { | ||||
| 	scope   instrumentation.Scope | ||||
| 	resolve *resolver[int64] | ||||
| 	*instProvider[int64] | ||||
| } | ||||
|  | ||||
| var _ asyncint64.InstrumentProvider = asyncInt64Provider{} | ||||
|  | ||||
| // Counter creates an instrument for recording increasing values. | ||||
| func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.AsyncCounter, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return &instrumentImpl[int64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.AsyncCounter, name, opts) | ||||
| } | ||||
|  | ||||
| // UpDownCounter creates an instrument for recording changes of a value. | ||||
| func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.AsyncUpDownCounter, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[int64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.AsyncUpDownCounter, name, opts) | ||||
| } | ||||
|  | ||||
| // Gauge creates an instrument for recording the current value. | ||||
| func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.AsyncGauge, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[int64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.AsyncGauge, name, opts) | ||||
| } | ||||
|  | ||||
| type asyncFloat64Provider struct { | ||||
| 	scope   instrumentation.Scope | ||||
| 	resolve *resolver[float64] | ||||
| 	*instProvider[float64] | ||||
| } | ||||
|  | ||||
| var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} | ||||
|  | ||||
| // Counter creates an instrument for recording increasing values. | ||||
| func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.AsyncCounter, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[float64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.AsyncCounter, name, opts) | ||||
| } | ||||
|  | ||||
| // UpDownCounter creates an instrument for recording changes of a value. | ||||
| func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.AsyncUpDownCounter, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[float64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.AsyncUpDownCounter, name, opts) | ||||
| } | ||||
|  | ||||
| // Gauge creates an instrument for recording the current value. | ||||
| func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.AsyncGauge, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[float64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.AsyncGauge, name, opts) | ||||
| } | ||||
|  | ||||
| type syncInt64Provider struct { | ||||
| 	scope   instrumentation.Scope | ||||
| 	resolve *resolver[int64] | ||||
| 	*instProvider[int64] | ||||
| } | ||||
|  | ||||
| var _ syncint64.InstrumentProvider = syncInt64Provider{} | ||||
|  | ||||
| // Counter creates an instrument for recording increasing values. | ||||
| func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.SyncCounter, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[int64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.SyncCounter, name, opts) | ||||
| } | ||||
|  | ||||
| // UpDownCounter creates an instrument for recording changes of a value. | ||||
| func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.SyncUpDownCounter, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[int64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.SyncUpDownCounter, name, opts) | ||||
| } | ||||
|  | ||||
| // Histogram creates an instrument for recording the current value. | ||||
| func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.SyncHistogram, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[int64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.SyncHistogram, name, opts) | ||||
| } | ||||
|  | ||||
| type syncFloat64Provider struct { | ||||
| 	scope   instrumentation.Scope | ||||
| 	resolve *resolver[float64] | ||||
| 	*instProvider[float64] | ||||
| } | ||||
|  | ||||
| var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} | ||||
|  | ||||
| // Counter creates an instrument for recording increasing values. | ||||
| func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.SyncCounter, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[float64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.SyncCounter, name, opts) | ||||
| } | ||||
|  | ||||
| // UpDownCounter creates an instrument for recording changes of a value. | ||||
| func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.SyncUpDownCounter, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[float64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.SyncUpDownCounter, name, opts) | ||||
| } | ||||
|  | ||||
| // Histogram creates an instrument for recording the current value. | ||||
| func (p syncFloat64Provider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) { | ||||
| 	cfg := instrument.NewConfig(opts...) | ||||
|  | ||||
| 	aggs, err := p.resolve.Aggregators(view.Instrument{ | ||||
| 		Scope:       p.scope, | ||||
| 		Name:        name, | ||||
| 		Description: cfg.Description(), | ||||
| 		Kind:        view.SyncHistogram, | ||||
| 	}, cfg.Unit()) | ||||
| 	if len(aggs) == 0 && err != nil { | ||||
| 		err = fmt.Errorf("instrument does not match any view: %w", err) | ||||
| 	} | ||||
| 	return &instrumentImpl[float64]{ | ||||
| 		aggregators: aggs, | ||||
| 	}, err | ||||
| 	return p.lookup(view.SyncHistogram, name, opts) | ||||
| } | ||||
|   | ||||
| @@ -31,15 +31,10 @@ import ( | ||||
| // produced by an instrumentation scope will use metric instruments from a | ||||
| // single meter. | ||||
| type meter struct { | ||||
| 	instrumentation.Scope | ||||
|  | ||||
| 	// *Resolvers are used by the provided instrument providers to resolve new | ||||
| 	// instruments aggregators and maintain a cache across instruments this | ||||
| 	// meter owns. | ||||
| 	int64Resolver   resolver[int64] | ||||
| 	float64Resolver resolver[float64] | ||||
|  | ||||
| 	pipes pipelines | ||||
|  | ||||
| 	instProviderInt64   *instProvider[int64] | ||||
| 	instProviderFloat64 *instProvider[float64] | ||||
| } | ||||
|  | ||||
| func newMeter(s instrumentation.Scope, p pipelines) *meter { | ||||
| @@ -52,12 +47,13 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { | ||||
| 	ic := newInstrumentCache[int64](nil, &viewCache) | ||||
| 	fc := newInstrumentCache[float64](nil, &viewCache) | ||||
|  | ||||
| 	return &meter{ | ||||
| 		Scope: s, | ||||
| 		pipes: p, | ||||
| 	ir := newResolver(s, p, ic) | ||||
| 	fr := newResolver(s, p, fc) | ||||
|  | ||||
| 		int64Resolver:   newResolver(p, ic), | ||||
| 		float64Resolver: newResolver(p, fc), | ||||
| 	return &meter{ | ||||
| 		pipes:               p, | ||||
| 		instProviderInt64:   newInstProvider(ir), | ||||
| 		instProviderFloat64: newInstProvider(fr), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -66,12 +62,12 @@ var _ metric.Meter = (*meter)(nil) | ||||
|  | ||||
| // AsyncInt64 returns the asynchronous integer instrument provider. | ||||
| func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { | ||||
| 	return asyncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver} | ||||
| 	return asyncInt64Provider{m.instProviderInt64} | ||||
| } | ||||
|  | ||||
| // AsyncFloat64 returns the asynchronous floating-point instrument provider. | ||||
| func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { | ||||
| 	return asyncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver} | ||||
| 	return asyncFloat64Provider{m.instProviderFloat64} | ||||
| } | ||||
|  | ||||
| // RegisterCallback registers the function f to be called when any of the | ||||
| @@ -83,10 +79,10 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context | ||||
|  | ||||
| // SyncInt64 returns the synchronous integer instrument provider. | ||||
| func (m *meter) SyncInt64() syncint64.InstrumentProvider { | ||||
| 	return syncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver} | ||||
| 	return syncInt64Provider{m.instProviderInt64} | ||||
| } | ||||
|  | ||||
| // SyncFloat64 returns the synchronous floating-point instrument provider. | ||||
| func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { | ||||
| 	return syncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver} | ||||
| 	return syncFloat64Provider{m.instProviderFloat64} | ||||
| } | ||||
|   | ||||
| @@ -25,6 +25,10 @@ import ( | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/metric" | ||||
| 	"go.opentelemetry.io/otel/metric/instrument" | ||||
| 	"go.opentelemetry.io/otel/metric/instrument/asyncfloat64" | ||||
| 	"go.opentelemetry.io/otel/metric/instrument/asyncint64" | ||||
| 	"go.opentelemetry.io/otel/metric/instrument/syncfloat64" | ||||
| 	"go.opentelemetry.io/otel/metric/instrument/syncint64" | ||||
| 	"go.opentelemetry.io/otel/sdk/instrumentation" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/metricdata" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" | ||||
| @@ -828,3 +832,47 @@ func TestAttributeFilter(t *testing.T) { | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	aiCounter       asyncint64.Counter | ||||
| 	aiUpDownCounter asyncint64.UpDownCounter | ||||
| 	aiGauge         asyncint64.Gauge | ||||
|  | ||||
| 	afCounter       asyncfloat64.Counter | ||||
| 	afUpDownCounter asyncfloat64.UpDownCounter | ||||
| 	afGauge         asyncfloat64.Gauge | ||||
|  | ||||
| 	siCounter       syncint64.Counter | ||||
| 	siUpDownCounter syncint64.UpDownCounter | ||||
| 	siHistogram     syncint64.Histogram | ||||
|  | ||||
| 	sfCounter       syncfloat64.Counter | ||||
| 	sfUpDownCounter syncfloat64.UpDownCounter | ||||
| 	sfHistogram     syncfloat64.Histogram | ||||
| ) | ||||
|  | ||||
| func BenchmarkInstrumentCreation(b *testing.B) { | ||||
| 	provider := NewMeterProvider(WithReader(NewManualReader())) | ||||
| 	meter := provider.Meter("BenchmarkInstrumentCreation") | ||||
|  | ||||
| 	b.ReportAllocs() | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for n := 0; n < b.N; n++ { | ||||
| 		aiCounter, _ = meter.AsyncInt64().Counter("async.int64.counter") | ||||
| 		aiUpDownCounter, _ = meter.AsyncInt64().UpDownCounter("async.int64.up.down.counter") | ||||
| 		aiGauge, _ = meter.AsyncInt64().Gauge("async.int64.gauge") | ||||
|  | ||||
| 		afCounter, _ = meter.AsyncFloat64().Counter("async.float64.counter") | ||||
| 		afUpDownCounter, _ = meter.AsyncFloat64().UpDownCounter("async.float64.up.down.counter") | ||||
| 		afGauge, _ = meter.AsyncFloat64().Gauge("async.float64.gauge") | ||||
|  | ||||
| 		siCounter, _ = meter.SyncInt64().Counter("sync.int64.counter") | ||||
| 		siUpDownCounter, _ = meter.SyncInt64().UpDownCounter("sync.int64.up.down.counter") | ||||
| 		siHistogram, _ = meter.SyncInt64().Histogram("sync.int64.histogram") | ||||
|  | ||||
| 		sfCounter, _ = meter.SyncFloat64().Counter("sync.float64.counter") | ||||
| 		sfUpDownCounter, _ = meter.SyncFloat64().UpDownCounter("sync.float64.up.down.counter") | ||||
| 		sfHistogram, _ = meter.SyncFloat64().Histogram("sync.float64.histogram") | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -156,14 +156,16 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| // inserter facilitates inserting of new instruments into a pipeline. | ||||
| // inserter facilitates inserting of new instruments from a single scope into a | ||||
| // pipeline. | ||||
| type inserter[N int64 | float64] struct { | ||||
| 	scope    instrumentation.Scope | ||||
| 	cache    instrumentCache[N] | ||||
| 	pipeline *pipeline | ||||
| } | ||||
|  | ||||
| func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] { | ||||
| 	return &inserter[N]{cache: c, pipeline: p} | ||||
| func newInserter[N int64 | float64](s instrumentation.Scope, p *pipeline, c instrumentCache[N]) *inserter[N] { | ||||
| 	return &inserter[N]{scope: s, cache: c, pipeline: p} | ||||
| } | ||||
|  | ||||
| // Instrument inserts the instrument inst with instUnit into a pipeline. All | ||||
| @@ -187,7 +189,7 @@ func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter | ||||
| // | ||||
| // If an instrument is determined to use a Drop aggregation, that instrument is | ||||
| // not inserted nor returned. | ||||
| func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { | ||||
| func (i *inserter[N]) Instrument(key instProviderKey) ([]internal.Aggregator[N], error) { | ||||
| 	var ( | ||||
| 		matched bool | ||||
| 		aggs    []internal.Aggregator[N] | ||||
| @@ -197,6 +199,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in | ||||
| 	// The cache will return the same Aggregator instance. Use this fact to | ||||
| 	// compare pointer addresses to deduplicate Aggregators. | ||||
| 	seen := make(map[internal.Aggregator[N]]struct{}) | ||||
| 	inst := key.viewInst(i.scope) | ||||
| 	for _, v := range i.pipeline.views { | ||||
| 		inst, match := v.TransformInstrument(inst) | ||||
| 		if !match { | ||||
| @@ -204,7 +207,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in | ||||
| 		} | ||||
| 		matched = true | ||||
|  | ||||
| 		agg, err := i.cachedAggregator(inst, instUnit, v.AttributeFilter()) | ||||
| 		agg, err := i.cachedAggregator(inst, key.Unit, v.AttributeFilter()) | ||||
| 		if err != nil { | ||||
| 			errs.append(err) | ||||
| 		} | ||||
| @@ -224,7 +227,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in | ||||
| 	} | ||||
|  | ||||
| 	// Apply implicit default view if no explicit matched. | ||||
| 	agg, err := i.cachedAggregator(inst, instUnit, nil) | ||||
| 	agg, err := i.cachedAggregator(inst, key.Unit, nil) | ||||
| 	if err != nil { | ||||
| 		errs.append(err) | ||||
| 	} | ||||
| @@ -449,22 +452,22 @@ type resolver[N int64 | float64] struct { | ||||
| 	inserters []*inserter[N] | ||||
| } | ||||
|  | ||||
| func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] { | ||||
| func newResolver[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) resolver[N] { | ||||
| 	in := make([]*inserter[N], len(p)) | ||||
| 	for i := range in { | ||||
| 		in[i] = newInserter(p[i], c) | ||||
| 		in[i] = newInserter(s, p[i], c) | ||||
| 	} | ||||
| 	return resolver[N]{in} | ||||
| } | ||||
|  | ||||
| // Aggregators returns the Aggregators instrument inst needs to update when it | ||||
| // makes a measurement. | ||||
| func (r resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { | ||||
| // Aggregators returns the Aggregators that must be updated by the instrument | ||||
| // defined by key. | ||||
| func (r resolver[N]) Aggregators(key instProviderKey) ([]internal.Aggregator[N], error) { | ||||
| 	var aggs []internal.Aggregator[N] | ||||
|  | ||||
| 	errs := &multierror{} | ||||
| 	for _, i := range r.inserters { | ||||
| 		a, err := i.Instrument(inst, instUnit) | ||||
| 		a, err := i.Instrument(key) | ||||
| 		if err != nil { | ||||
| 			errs.append(err) | ||||
| 		} | ||||
|   | ||||
| @@ -25,7 +25,7 @@ import ( | ||||
|  | ||||
| 	"go.opentelemetry.io/otel" | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/metric/unit" | ||||
| 	"go.opentelemetry.io/otel/sdk/instrumentation" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregation" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/internal" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/view" | ||||
| @@ -61,7 +61,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { | ||||
| 		view.WithSetAggregation(invalidAggregation{}), | ||||
| 	) | ||||
|  | ||||
| 	instruments := []view.Instrument{ | ||||
| 	instruments := []instProviderKey{ | ||||
| 		{Name: "foo", Kind: view.InstrumentKind(0)}, //Unknown kind | ||||
| 		{Name: "foo", Kind: view.SyncCounter}, | ||||
| 		{Name: "foo", Kind: view.SyncUpDownCounter}, | ||||
| @@ -75,7 +75,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { | ||||
| 		name     string | ||||
| 		reader   Reader | ||||
| 		views    []view.View | ||||
| 		inst     view.Instrument | ||||
| 		inst     instProviderKey | ||||
| 		wantKind internal.Aggregator[N] //Aggregators should match len and types | ||||
| 		wantLen  int | ||||
| 		wantErr  error | ||||
| @@ -213,11 +213,12 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { | ||||
| 			wantErr: errCreatingAggregators, | ||||
| 		}, | ||||
| 	} | ||||
| 	s := instrumentation.Scope{Name: "testCreateAggregators"} | ||||
| 	for _, tt := range testcases { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			c := newInstrumentCache[N](nil, nil) | ||||
| 			i := newInserter(newPipeline(nil, tt.reader, tt.views), c) | ||||
| 			got, err := i.Instrument(tt.inst, unit.Dimensionless) | ||||
| 			i := newInserter(s, newPipeline(nil, tt.reader, tt.views), c) | ||||
| 			got, err := i.Instrument(tt.inst) | ||||
| 			assert.ErrorIs(t, err, tt.wantErr) | ||||
| 			require.Len(t, got, tt.wantLen) | ||||
| 			for _, agg := range got { | ||||
| @@ -229,12 +230,13 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { | ||||
|  | ||||
| func testInvalidInstrumentShouldPanic[N int64 | float64]() { | ||||
| 	c := newInstrumentCache[N](nil, nil) | ||||
| 	i := newInserter(newPipeline(nil, NewManualReader(), []view.View{{}}), c) | ||||
| 	inst := view.Instrument{ | ||||
| 	s := instrumentation.Scope{Name: "testInvalidInstrumentShouldPanic"} | ||||
| 	i := newInserter(s, newPipeline(nil, NewManualReader(), []view.View{{}}), c) | ||||
| 	inst := instProviderKey{ | ||||
| 		Name: "foo", | ||||
| 		Kind: view.InstrumentKind(255), | ||||
| 	} | ||||
| 	_, _ = i.Instrument(inst, unit.Dimensionless) | ||||
| 	_, _ = i.Instrument(inst) | ||||
| } | ||||
|  | ||||
| func TestInvalidInstrumentShouldPanic(t *testing.T) { | ||||
| @@ -313,22 +315,24 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { | ||||
| 	inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} | ||||
| 	inst := instProviderKey{Name: "foo", Kind: view.SyncCounter} | ||||
|  | ||||
| 	c := newInstrumentCache[int64](nil, nil) | ||||
| 	r := newResolver(p, c) | ||||
| 	aggs, err := r.Aggregators(inst, unit.Dimensionless) | ||||
| 	s := instrumentation.Scope{Name: "testPipelineRegistryResolveIntAggregators"} | ||||
| 	r := newResolver(s, p, c) | ||||
| 	aggs, err := r.Aggregators(inst) | ||||
| 	assert.NoError(t, err) | ||||
|  | ||||
| 	require.Len(t, aggs, wantCount) | ||||
| } | ||||
|  | ||||
| func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { | ||||
| 	inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} | ||||
| 	inst := instProviderKey{Name: "foo", Kind: view.SyncCounter} | ||||
|  | ||||
| 	c := newInstrumentCache[float64](nil, nil) | ||||
| 	r := newResolver(p, c) | ||||
| 	aggs, err := r.Aggregators(inst, unit.Dimensionless) | ||||
| 	s := instrumentation.Scope{Name: "testPipelineRegistryResolveFloatAggregators"} | ||||
| 	r := newResolver(s, p, c) | ||||
| 	aggs, err := r.Aggregators(inst) | ||||
| 	assert.NoError(t, err) | ||||
|  | ||||
| 	require.Len(t, aggs, wantCount) | ||||
| @@ -352,16 +356,17 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { | ||||
| 	readers := []Reader{testRdrHistogram} | ||||
| 	views := []view.View{{}} | ||||
| 	p := newPipelines(resource.Empty(), readers, views) | ||||
| 	inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} | ||||
| 	inst := instProviderKey{Name: "foo", Kind: view.AsyncGauge} | ||||
|  | ||||
| 	vc := cache[string, instrumentID]{} | ||||
| 	ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) | ||||
| 	intAggs, err := ri.Aggregators(inst, unit.Dimensionless) | ||||
| 	s := instrumentation.Scope{Name: "TestPipelineRegistryCreateAggregatorsIncompatibleInstrument"} | ||||
| 	ri := newResolver(s, p, newInstrumentCache[int64](nil, &vc)) | ||||
| 	intAggs, err := ri.Aggregators(inst) | ||||
| 	assert.Error(t, err) | ||||
| 	assert.Len(t, intAggs, 0) | ||||
|  | ||||
| 	rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) | ||||
| 	floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) | ||||
| 	rf := newResolver(s, p, newInstrumentCache[float64](nil, &vc)) | ||||
| 	floatAggs, err := rf.Aggregators(inst) | ||||
| 	assert.Error(t, err) | ||||
| 	assert.Len(t, floatAggs, 0) | ||||
| } | ||||
| @@ -393,41 +398,42 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { | ||||
| 	readers := []Reader{NewManualReader()} | ||||
| 	views := []view.View{{}, renameView} | ||||
|  | ||||
| 	fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} | ||||
| 	barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} | ||||
| 	fooInst := instProviderKey{Name: "foo", Kind: view.SyncCounter} | ||||
| 	barInst := instProviderKey{Name: "bar", Kind: view.SyncCounter} | ||||
|  | ||||
| 	p := newPipelines(resource.Empty(), readers, views) | ||||
|  | ||||
| 	vc := cache[string, instrumentID]{} | ||||
| 	ri := newResolver(p, newInstrumentCache[int64](nil, &vc)) | ||||
| 	intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) | ||||
| 	s := instrumentation.Scope{Name: "TestPipelineRegistryCreateAggregatorsDuplicateErrors"} | ||||
| 	ri := newResolver(s, p, newInstrumentCache[int64](nil, &vc)) | ||||
| 	intAggs, err := ri.Aggregators(fooInst) | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.Equal(t, 0, l.InfoN(), "no info logging should happen") | ||||
| 	assert.Len(t, intAggs, 1) | ||||
|  | ||||
| 	// The Rename view should produce the same instrument without an error, the | ||||
| 	// default view should also cause a new aggregator to be returned. | ||||
| 	intAggs, err = ri.Aggregators(barInst, unit.Dimensionless) | ||||
| 	intAggs, err = ri.Aggregators(barInst) | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.Equal(t, 0, l.InfoN(), "no info logging should happen") | ||||
| 	assert.Len(t, intAggs, 2) | ||||
|  | ||||
| 	// Creating a float foo instrument should log a warning because there is an | ||||
| 	// int foo instrument. | ||||
| 	rf := newResolver(p, newInstrumentCache[float64](nil, &vc)) | ||||
| 	floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) | ||||
| 	rf := newResolver(s, p, newInstrumentCache[float64](nil, &vc)) | ||||
| 	floatAggs, err := rf.Aggregators(fooInst) | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged") | ||||
| 	assert.Len(t, floatAggs, 1) | ||||
|  | ||||
| 	fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter} | ||||
| 	fooInst = instProviderKey{Name: "foo-float", Kind: view.SyncCounter} | ||||
|  | ||||
| 	floatAggs, err = rf.Aggregators(fooInst, unit.Dimensionless) | ||||
| 	floatAggs, err = rf.Aggregators(fooInst) | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.Equal(t, 0, l.InfoN(), "no info logging should happen") | ||||
| 	assert.Len(t, floatAggs, 1) | ||||
|  | ||||
| 	floatAggs, err = rf.Aggregators(barInst, unit.Dimensionless) | ||||
| 	floatAggs, err = rf.Aggregators(barInst) | ||||
| 	assert.NoError(t, err) | ||||
| 	// Both the rename and default view aggregators created above should now | ||||
| 	// conflict. Therefore, 2 warning messages should be logged. | ||||
|   | ||||
| @@ -26,7 +26,6 @@ import ( | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/metric/unit" | ||||
| 	"go.opentelemetry.io/otel/sdk/instrumentation" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregation" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/metricdata" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/view" | ||||
| @@ -135,12 +134,12 @@ func TestDefaultViewImplicit(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { | ||||
| 	inst := view.Instrument{ | ||||
| 		Scope:       instrumentation.Scope{Name: "testing/lib"}, | ||||
| 	scope := instrumentation.Scope{Name: "testing/lib"} | ||||
| 	inst := instProviderKey{ | ||||
| 		Name:        "requests", | ||||
| 		Description: "count of requests received", | ||||
| 		Kind:        view.SyncCounter, | ||||
| 		Aggregation: aggregation.Sum{}, | ||||
| 		Unit:        unit.Dimensionless, | ||||
| 	} | ||||
| 	return func(t *testing.T) { | ||||
| 		reader := NewManualReader() | ||||
| @@ -164,8 +163,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { | ||||
| 		for _, test := range tests { | ||||
| 			t.Run(test.name, func(t *testing.T) { | ||||
| 				c := newInstrumentCache[N](nil, nil) | ||||
| 				i := newInserter(test.pipe, c) | ||||
| 				got, err := i.Instrument(inst, unit.Dimensionless) | ||||
| 				i := newInserter(scope, test.pipe, c) | ||||
| 				got, err := i.Instrument(inst) | ||||
| 				require.NoError(t, err) | ||||
| 				assert.Len(t, got, 1, "default view not applied") | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user