1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Use inst ID for agg cache key (#4337)

* Use inst ID for agg cache key

Resolve #4201

The specification requires the duplicate instrument conflicts to be
identified based on the instrument identifying fields:

- name
- instrument kind
- unit
- description
- language-level features such as the number type (int64 and float64)

Currently, the conflict detection and aggregation caching are done based
on the stream IDs which include an aggregation name, monotonicity, and
temporality instead of the instrument kind.

This changes the conflict detection and aggregation caching to use the
OpenTelemetry specified fields. This is effectively a no-op given there
is a 1-to-1 mapping of aggregation-name/monotonicity/temporality to
instrument kind (they are all resolved based on the instrument kind).

Additionally, this adds a stringer representation of the
`InstrumentKind`. This is needed for the logging of duplicate instrument
conflicts.

* Add changes to changelog
This commit is contained in:
Tyler Yahn
2023-07-19 09:59:07 -07:00
committed by GitHub
parent c197fe9305
commit 84b2e54671
7 changed files with 61 additions and 45 deletions

View File

@@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143)
- Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307)
- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317)
- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337)
## [1.16.0/0.39.0] 2023-05-18

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:generate stringer -type=InstrumentKind -trimprefix=InstrumentKind
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
@@ -25,7 +27,6 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
var (
@@ -172,23 +173,16 @@ func (s Stream) attributeFilter() attribute.Filter {
}
}
// streamID are the identifying properties of a stream.
type streamID struct {
// instID are the identifying properties of a instrument.
type instID struct {
// Name is the name of the stream.
Name string
// Description is the description of the stream.
Description string
// Kind defines the functional group of the instrument.
Kind InstrumentKind
// Unit is the unit of the stream.
Unit string
// Aggregation is the aggregation data type of the stream.
Aggregation string
// Monotonic is the monotonicity of an instruments data type. This field is
// not used for all data types, so a zero value needs to be understood in the
// context of Aggregation.
Monotonic bool
// Temporality is the temporality of a stream's data type. This field is
// not used by some data types.
Temporality metricdata.Temporality
// Number is the number type of the stream.
Number string
}

View File

@@ -0,0 +1,29 @@
// Code generated by "stringer -type=InstrumentKind -trimprefix=InstrumentKind"; DO NOT EDIT.
package metric
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[instrumentKindUndefined-0]
_ = x[InstrumentKindCounter-1]
_ = x[InstrumentKindUpDownCounter-2]
_ = x[InstrumentKindHistogram-3]
_ = x[InstrumentKindObservableCounter-4]
_ = x[InstrumentKindObservableUpDownCounter-5]
_ = x[InstrumentKindObservableGauge-6]
}
const _InstrumentKind_name = "instrumentKindUndefinedCounterUpDownCounterHistogramObservableCounterObservableUpDownCounterObservableGauge"
var _InstrumentKind_index = [...]uint8{0, 23, 30, 43, 52, 69, 92, 107}
func (i InstrumentKind) String() string {
if i >= InstrumentKind(len(_InstrumentKind_index)-1) {
return "InstrumentKind(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _InstrumentKind_name[_InstrumentKind_index[i]:_InstrumentKind_index[i+1]]
}

View File

@@ -49,7 +49,7 @@ type meter struct {
func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]
var viewCache cache[string, instID]
return &meter{
scope: s,

View File

@@ -187,24 +187,24 @@ type inserter[N int64 | float64] struct {
// cache ensures no duplicate aggregate functions are inserted into the
// reader pipeline and if a new request during an instrument creation asks
// for the same aggregate function input the same instance is returned.
aggregators *cache[streamID, aggVal[N]]
aggregators *cache[instID, aggVal[N]]
// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, streamID]
views *cache[string, instID]
pipeline *pipeline
}
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] {
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] {
if vc == nil {
vc = &cache[string, streamID]{}
vc = &cache[string, instID]{}
}
return &inserter[N]{
aggregators: &cache[streamID, aggVal[N]]{},
aggregators: &cache[instID, aggVal[N]]{},
views: vc,
pipeline: p,
}
@@ -320,12 +320,14 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
)
}
id := i.streamID(kind, stream)
id := i.instID(kind, stream)
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
cv := i.aggregators.Lookup(id, func() aggVal[N] {
b := aggregate.Builder[N]{Temporality: id.Temporality}
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
}
if len(stream.AllowAttributeKeys) > 0 {
b.Filter = stream.attributeFilter()
}
@@ -350,8 +352,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// logConflict validates if an instrument with the same name as id has already
// been created. If that instrument conflicts with id, a warning is logged.
func (i *inserter[N]) logConflict(id streamID) {
existing := i.views.Lookup(id.Name, func() streamID { return id })
func (i *inserter[N]) logConflict(id instID) {
existing := i.views.Lookup(id.Name, func() instID { return id })
if id == existing {
return
}
@@ -360,31 +362,21 @@ func (i *inserter[N]) logConflict(id streamID) {
"duplicate metric stream definitions",
"names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
"descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
"kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind),
"units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit),
"numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number),
"aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation),
"monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic),
"temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()),
)
}
func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID {
func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
var zero N
id := streamID{
return instID{
Name: stream.Name,
Description: stream.Description,
Unit: stream.Unit,
Aggregation: fmt.Sprintf("%T", stream.Aggregation),
Temporality: i.pipeline.reader.temporality(kind),
Kind: kind,
Number: fmt.Sprintf("%T", zero),
}
switch kind {
case InstrumentKindObservableCounter, InstrumentKindCounter, InstrumentKindHistogram:
id.Monotonic = true
}
return id
}
// aggregateFunc returns new aggregate functions matching agg, kind, and
@@ -526,7 +518,7 @@ type resolver[N int64 | float64] struct {
inserters []*inserter[N]
}
func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] {
func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter[N](p[i], vc)

View File

@@ -350,7 +350,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var c cache[string, streamID]
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views)
i := newInserter[N](p, &c)
input, err := i.Instrument(tt.inst)
@@ -371,7 +371,7 @@ func TestCreateAggregators(t *testing.T) {
}
func testInvalidInstrumentShouldPanic[N int64 | float64]() {
var c cache[string, streamID]
var c cache[string, instID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
inst := Instrument{
Name: "foo",
@@ -391,7 +391,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) {
require.Len(t, pipes, 2, "created pipelines")
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[int64](pipes, &c)
aggs, err := r.Aggregators(inst)
require.NoError(t, err, "resolved Aggregators error")
@@ -468,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[int64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
@@ -478,7 +478,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo
func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[float64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
@@ -505,7 +505,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}
var vc cache[string, streamID]
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(inst)
assert.Error(t, err)
@@ -556,7 +556,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
var vc cache[string, streamID]
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(fooInst)
assert.NoError(t, err)

View File

@@ -137,7 +137,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var c cache[string, streamID]
var c cache[string, instID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
require.NoError(t, err)