You've already forked opentelemetry-go
							
							
				mirror of
				https://github.com/open-telemetry/opentelemetry-go.git
				synced 2025-10-31 00:07:40 +02:00 
			
		
		
		
	Add the UpDownSumObserver instrument (#750)
* Add the UpDownSumObserver instrument * Precommit * Downcase error message
This commit is contained in:
		| @@ -224,6 +224,28 @@ func TestObserverInstruments(t *testing.T) { | ||||
| 			-142, | ||||
| 		) | ||||
| 	}) | ||||
| 	t.Run("float updownsumobserver", func(t *testing.T) { | ||||
| 		labels := []kv.KeyValue{kv.String("O", "P")} | ||||
| 		mockSDK, meter := mockTest.NewMeter() | ||||
| 		o := Must(meter).RegisterFloat64UpDownSumObserver("test.updownsumobserver.float", func(_ context.Context, result metric.Float64ObserverResult) { | ||||
| 			result.Observe(42.1, labels...) | ||||
| 		}) | ||||
| 		mockSDK.RunAsyncInstruments() | ||||
| 		checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, metric.UpDownSumObserverKind, o.AsyncImpl(), | ||||
| 			42.1, | ||||
| 		) | ||||
| 	}) | ||||
| 	t.Run("int updownsumobserver", func(t *testing.T) { | ||||
| 		labels := []kv.KeyValue{} | ||||
| 		mockSDK, meter := mockTest.NewMeter() | ||||
| 		o := Must(meter).RegisterInt64UpDownSumObserver("test.observer.int", func(_ context.Context, result metric.Int64ObserverResult) { | ||||
| 			result.Observe(-142, labels...) | ||||
| 		}) | ||||
| 		mockSDK.RunAsyncInstruments() | ||||
| 		checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, metric.UpDownSumObserverKind, o.AsyncImpl(), | ||||
| 			-142, | ||||
| 		) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func checkSyncBatches(t *testing.T, ctx context.Context, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, instrument metric.InstrumentImpl, expected ...float64) { | ||||
|   | ||||
| @@ -203,3 +203,15 @@ func wrapFloat64SumObserverInstrument(asyncInst AsyncImpl, err error) (Float64Su | ||||
| 	common, err := checkNewAsync(asyncInst, err) | ||||
| 	return Float64SumObserver{asyncInstrument: common}, err | ||||
| } | ||||
|  | ||||
| // wrapInt64UpDownSumObserverInstrument converts an AsyncImpl into Int64UpDownSumObserver. | ||||
| func wrapInt64UpDownSumObserverInstrument(asyncInst AsyncImpl, err error) (Int64UpDownSumObserver, error) { | ||||
| 	common, err := checkNewAsync(asyncInst, err) | ||||
| 	return Int64UpDownSumObserver{asyncInstrument: common}, err | ||||
| } | ||||
|  | ||||
| // wrapFloat64UpDownSumObserverInstrument converts an AsyncImpl into Float64UpDownSumObserver. | ||||
| func wrapFloat64UpDownSumObserverInstrument(asyncInst AsyncImpl, err error) (Float64UpDownSumObserver, error) { | ||||
| 	common, err := checkNewAsync(asyncInst, err) | ||||
| 	return Float64UpDownSumObserver{asyncInstrument: common}, err | ||||
| } | ||||
|   | ||||
| @@ -32,4 +32,6 @@ const ( | ||||
|  | ||||
| 	// SumObserverKind indicates a SumObserver instrument. | ||||
| 	SumObserverKind | ||||
| 	// UpDownSumObserverKind indicates a UpDownSumObserver instrument. | ||||
| 	UpDownSumObserverKind | ||||
| ) | ||||
|   | ||||
| @@ -13,11 +13,12 @@ func _() { | ||||
| 	_ = x[CounterKind-2] | ||||
| 	_ = x[UpDownCounterKind-3] | ||||
| 	_ = x[SumObserverKind-4] | ||||
| 	_ = x[UpDownSumObserverKind-5] | ||||
| } | ||||
|  | ||||
| const _Kind_name = "ValueRecorderKindValueObserverKindCounterKindUpDownCounterKindSumObserverKind" | ||||
| const _Kind_name = "ValueRecorderKindValueObserverKindCounterKindUpDownCounterKindSumObserverKindUpDownSumObserverKind" | ||||
|  | ||||
| var _Kind_index = [...]uint8{0, 17, 34, 45, 62, 77} | ||||
| var _Kind_index = [...]uint8{0, 17, 34, 45, 62, 77, 98} | ||||
|  | ||||
| func (i Kind) String() string { | ||||
| 	if i < 0 || i >= Kind(len(_Kind_index)-1) { | ||||
|   | ||||
| @@ -170,6 +170,32 @@ func (m Meter) RegisterFloat64SumObserver(name string, callback Float64ObserverC | ||||
| 			newFloat64AsyncRunner(callback))) | ||||
| } | ||||
|  | ||||
| // RegisterInt64UpDownSumObserver creates a new integer UpDownSumObserver instrument | ||||
| // with the given name, running a given callback, and customized with | ||||
| // options.  May return an error if the name is invalid (e.g., empty) | ||||
| // or improperly registered (e.g., duplicate registration). | ||||
| func (m Meter) RegisterInt64UpDownSumObserver(name string, callback Int64ObserverCallback, opts ...Option) (Int64UpDownSumObserver, error) { | ||||
| 	if callback == nil { | ||||
| 		return wrapInt64UpDownSumObserverInstrument(NoopAsync{}, nil) | ||||
| 	} | ||||
| 	return wrapInt64UpDownSumObserverInstrument( | ||||
| 		m.newAsync(name, UpDownSumObserverKind, Int64NumberKind, opts, | ||||
| 			newInt64AsyncRunner(callback))) | ||||
| } | ||||
|  | ||||
| // RegisterFloat64UpDownSumObserver creates a new floating point UpDownSumObserver with | ||||
| // the given name, running a given callback, and customized with | ||||
| // options.  May return an error if the name is invalid (e.g., empty) | ||||
| // or improperly registered (e.g., duplicate registration). | ||||
| func (m Meter) RegisterFloat64UpDownSumObserver(name string, callback Float64ObserverCallback, opts ...Option) (Float64UpDownSumObserver, error) { | ||||
| 	if callback == nil { | ||||
| 		return wrapFloat64UpDownSumObserverInstrument(NoopAsync{}, nil) | ||||
| 	} | ||||
| 	return wrapFloat64UpDownSumObserverInstrument( | ||||
| 		m.newAsync(name, UpDownSumObserverKind, Float64NumberKind, opts, | ||||
| 			newFloat64AsyncRunner(callback))) | ||||
| } | ||||
|  | ||||
| // RegisterInt64ValueObserver creates a new integer ValueObserver instrument | ||||
| // with the given name, running in a batch callback, and customized with | ||||
| // options.  May return an error if the name is invalid (e.g., empty) | ||||
| @@ -220,6 +246,31 @@ func (b BatchObserver) RegisterFloat64SumObserver(name string, opts ...Option) ( | ||||
| 			b.runner)) | ||||
| } | ||||
|  | ||||
| // RegisterInt64UpDownSumObserver creates a new integer UpDownSumObserver instrument | ||||
| // with the given name, running in a batch callback, and customized with | ||||
| // options.  May return an error if the name is invalid (e.g., empty) | ||||
| // or improperly registered (e.g., duplicate registration). | ||||
| func (b BatchObserver) RegisterInt64UpDownSumObserver(name string, opts ...Option) (Int64UpDownSumObserver, error) { | ||||
| 	if b.runner == nil { | ||||
| 		return wrapInt64UpDownSumObserverInstrument(NoopAsync{}, nil) | ||||
| 	} | ||||
| 	return wrapInt64UpDownSumObserverInstrument( | ||||
| 		b.meter.newAsync(name, UpDownSumObserverKind, Int64NumberKind, opts, b.runner)) | ||||
| } | ||||
|  | ||||
| // RegisterFloat64UpDownSumObserver creates a new floating point UpDownSumObserver with | ||||
| // the given name, running in a batch callback, and customized with | ||||
| // options.  May return an error if the name is invalid (e.g., empty) | ||||
| // or improperly registered (e.g., duplicate registration). | ||||
| func (b BatchObserver) RegisterFloat64UpDownSumObserver(name string, opts ...Option) (Float64UpDownSumObserver, error) { | ||||
| 	if b.runner == nil { | ||||
| 		return wrapFloat64UpDownSumObserverInstrument(NoopAsync{}, nil) | ||||
| 	} | ||||
| 	return wrapFloat64UpDownSumObserverInstrument( | ||||
| 		b.meter.newAsync(name, UpDownSumObserverKind, Float64NumberKind, opts, | ||||
| 			b.runner)) | ||||
| } | ||||
|  | ||||
| // MeterImpl returns the underlying MeterImpl of this Meter. | ||||
| func (m Meter) MeterImpl() MeterImpl { | ||||
| 	return m.impl | ||||
|   | ||||
| @@ -133,6 +133,26 @@ func (mm MeterMust) RegisterFloat64SumObserver(name string, callback Float64Obse | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // RegisterInt64UpDownSumObserver calls `Meter.RegisterInt64UpDownSumObserver` and | ||||
| // returns the instrument, panicking if it encounters an error. | ||||
| func (mm MeterMust) RegisterInt64UpDownSumObserver(name string, callback Int64ObserverCallback, oos ...Option) Int64UpDownSumObserver { | ||||
| 	if inst, err := mm.meter.RegisterInt64UpDownSumObserver(name, callback, oos...); err != nil { | ||||
| 		panic(err) | ||||
| 	} else { | ||||
| 		return inst | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // RegisterFloat64UpDownSumObserver calls `Meter.RegisterFloat64UpDownSumObserver` and | ||||
| // returns the instrument, panicking if it encounters an error. | ||||
| func (mm MeterMust) RegisterFloat64UpDownSumObserver(name string, callback Float64ObserverCallback, oos ...Option) Float64UpDownSumObserver { | ||||
| 	if inst, err := mm.meter.RegisterFloat64UpDownSumObserver(name, callback, oos...); err != nil { | ||||
| 		panic(err) | ||||
| 	} else { | ||||
| 		return inst | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // NewBatchObserver returns a wrapper around BatchObserver that panics | ||||
| // when any instrument constructor returns an error. | ||||
| func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust { | ||||
| @@ -180,3 +200,23 @@ func (bm BatchObserverMust) RegisterFloat64SumObserver(name string, oos ...Optio | ||||
| 		return inst | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // RegisterInt64UpDownSumObserver calls `BatchObserver.RegisterInt64UpDownSumObserver` and | ||||
| // returns the instrument, panicking if it encounters an error. | ||||
| func (bm BatchObserverMust) RegisterInt64UpDownSumObserver(name string, oos ...Option) Int64UpDownSumObserver { | ||||
| 	if inst, err := bm.batch.RegisterInt64UpDownSumObserver(name, oos...); err != nil { | ||||
| 		panic(err) | ||||
| 	} else { | ||||
| 		return inst | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // RegisterFloat64UpDownSumObserver calls `BatchObserver.RegisterFloat64UpDownSumObserver` and | ||||
| // returns the instrument, panicking if it encounters an error. | ||||
| func (bm BatchObserverMust) RegisterFloat64UpDownSumObserver(name string, oos ...Option) Float64UpDownSumObserver { | ||||
| 	if inst, err := bm.batch.RegisterFloat64UpDownSumObserver(name, oos...); err != nil { | ||||
| 		panic(err) | ||||
| 	} else { | ||||
| 		return inst | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -45,6 +45,18 @@ type Float64SumObserver struct { | ||||
| 	asyncInstrument | ||||
| } | ||||
|  | ||||
| // Int64UpDownSumObserver is a metric that captures a precomputed sum of | ||||
| // int64 values at a point in time. | ||||
| type Int64UpDownSumObserver struct { | ||||
| 	asyncInstrument | ||||
| } | ||||
|  | ||||
| // Float64UpDownSumObserver is a metric that captures a precomputed sum of | ||||
| // float64 values at a point in time. | ||||
| type Float64UpDownSumObserver struct { | ||||
| 	asyncInstrument | ||||
| } | ||||
|  | ||||
| // Observation returns an Observation, a BatchObserverCallback | ||||
| // argument, for an asynchronous integer instrument. | ||||
| // This returns an implementation-level object for use by the SDK, | ||||
| @@ -88,3 +100,25 @@ func (f Float64SumObserver) Observation(v float64) Observation { | ||||
| 		instrument: f.instrument, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Observation returns an Observation, a BatchObserverCallback | ||||
| // argument, for an asynchronous integer instrument. | ||||
| // This returns an implementation-level object for use by the SDK, | ||||
| // users should not refer to this. | ||||
| func (i Int64UpDownSumObserver) Observation(v int64) Observation { | ||||
| 	return Observation{ | ||||
| 		number:     NewInt64Number(v), | ||||
| 		instrument: i.instrument, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Observation returns an Observation, a BatchObserverCallback | ||||
| // argument, for an asynchronous integer instrument. | ||||
| // This returns an implementation-level object for use by the SDK, | ||||
| // users should not refer to this. | ||||
| func (f Float64UpDownSumObserver) Observation(v float64) Observation { | ||||
| 	return Observation{ | ||||
| 		number:     NewFloat64Number(v), | ||||
| 		instrument: f.instrument, | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -347,27 +347,45 @@ func TestObserverCollection(t *testing.T) { | ||||
| 		result.Observe(1) | ||||
| 	}) | ||||
|  | ||||
| 	_ = Must(meter).RegisterFloat64UpDownSumObserver("float.updownsumobserver", func(_ context.Context, result metric.Float64ObserverResult) { | ||||
| 		result.Observe(1, kv.String("A", "B")) | ||||
| 		result.Observe(-2, kv.String("A", "B")) | ||||
| 		result.Observe(1, kv.String("C", "D")) | ||||
| 	}) | ||||
| 	_ = Must(meter).RegisterInt64UpDownSumObserver("int.updownsumobserver", func(_ context.Context, result metric.Int64ObserverResult) { | ||||
| 		result.Observe(2, kv.String("A", "B")) | ||||
| 		result.Observe(1) | ||||
| 		// last value wins | ||||
| 		result.Observe(1, kv.String("A", "B")) | ||||
| 		result.Observe(-1) | ||||
| 	}) | ||||
|  | ||||
| 	_ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) { | ||||
| 	}) | ||||
|  | ||||
| 	collected := sdk.Collect(ctx) | ||||
|  | ||||
| 	require.Equal(t, 8, collected) | ||||
| 	require.Equal(t, 8, len(integrator.records)) | ||||
| 	require.Equal(t, collected, len(integrator.records)) | ||||
|  | ||||
| 	out := batchTest.NewOutput(label.DefaultEncoder()) | ||||
| 	for _, rec := range integrator.records { | ||||
| 		_ = out.AddTo(rec) | ||||
| 	} | ||||
| 	require.EqualValues(t, map[string]float64{ | ||||
| 		"float.sumobserver/A=B/R=V":   2, | ||||
| 		"float.sumobserver/C=D/R=V":   1, | ||||
| 		"int.sumobserver//R=V":        1, | ||||
| 		"int.sumobserver/A=B/R=V":     1, | ||||
| 		"float.valueobserver/A=B/R=V": -1, | ||||
| 		"float.valueobserver/C=D/R=V": -1, | ||||
| 		"int.valueobserver//R=V":      1, | ||||
| 		"int.valueobserver/A=B/R=V":   1, | ||||
|  | ||||
| 		"float.sumobserver/A=B/R=V": 2, | ||||
| 		"float.sumobserver/C=D/R=V": 1, | ||||
| 		"int.sumobserver//R=V":      1, | ||||
| 		"int.sumobserver/A=B/R=V":   1, | ||||
|  | ||||
| 		"float.updownsumobserver/A=B/R=V": -2, | ||||
| 		"float.updownsumobserver/C=D/R=V": 1, | ||||
| 		"int.updownsumobserver//R=V":      -1, | ||||
| 		"int.updownsumobserver/A=B/R=V":   1, | ||||
| 	}, out.Map) | ||||
| } | ||||
|  | ||||
| @@ -405,6 +423,8 @@ func TestObserverBatch(t *testing.T) { | ||||
| 	var intValueObs metric.Int64ValueObserver | ||||
| 	var floatSumObs metric.Float64SumObserver | ||||
| 	var intSumObs metric.Int64SumObserver | ||||
| 	var floatUpDownSumObs metric.Float64UpDownSumObserver | ||||
| 	var intUpDownSumObs metric.Int64UpDownSumObserver | ||||
|  | ||||
| 	var batch = Must(meter).NewBatchObserver( | ||||
| 		func(_ context.Context, result metric.BatchObserverResult) { | ||||
| @@ -418,6 +438,8 @@ func TestObserverBatch(t *testing.T) { | ||||
| 				intValueObs.Observation(1), | ||||
| 				floatSumObs.Observation(1000), | ||||
| 				intSumObs.Observation(100), | ||||
| 				floatUpDownSumObs.Observation(-1000), | ||||
| 				intUpDownSumObs.Observation(-100), | ||||
| 			) | ||||
| 			result.Observe( | ||||
| 				[]kv.KeyValue{ | ||||
| @@ -425,6 +447,7 @@ func TestObserverBatch(t *testing.T) { | ||||
| 				}, | ||||
| 				floatValueObs.Observation(-1), | ||||
| 				floatSumObs.Observation(-1), | ||||
| 				floatUpDownSumObs.Observation(-1), | ||||
| 			) | ||||
| 			result.Observe( | ||||
| 				nil, | ||||
| @@ -432,27 +455,35 @@ func TestObserverBatch(t *testing.T) { | ||||
| 				intValueObs.Observation(1), | ||||
| 				intSumObs.Observation(10), | ||||
| 				floatSumObs.Observation(1.1), | ||||
| 				intUpDownSumObs.Observation(10), | ||||
| 			) | ||||
| 		}) | ||||
| 	floatValueObs = batch.RegisterFloat64ValueObserver("float.valueobserver") | ||||
| 	intValueObs = batch.RegisterInt64ValueObserver("int.valueobserver") | ||||
| 	floatSumObs = batch.RegisterFloat64SumObserver("float.sumobserver") | ||||
| 	intSumObs = batch.RegisterInt64SumObserver("int.sumobserver") | ||||
| 	floatUpDownSumObs = batch.RegisterFloat64UpDownSumObserver("float.updownsumobserver") | ||||
| 	intUpDownSumObs = batch.RegisterInt64UpDownSumObserver("int.updownsumobserver") | ||||
|  | ||||
| 	collected := sdk.Collect(ctx) | ||||
|  | ||||
| 	require.Equal(t, 8, collected) | ||||
| 	require.Equal(t, 8, len(integrator.records)) | ||||
| 	require.Equal(t, collected, len(integrator.records)) | ||||
|  | ||||
| 	out := batchTest.NewOutput(label.DefaultEncoder()) | ||||
| 	for _, rec := range integrator.records { | ||||
| 		_ = out.AddTo(rec) | ||||
| 	} | ||||
| 	require.EqualValues(t, map[string]float64{ | ||||
| 		"float.sumobserver//R=V":      1.1, | ||||
| 		"float.sumobserver/A=B/R=V":   1000, | ||||
| 		"int.sumobserver//R=V":        10, | ||||
| 		"int.sumobserver/A=B/R=V":     100, | ||||
| 		"float.sumobserver//R=V":    1.1, | ||||
| 		"float.sumobserver/A=B/R=V": 1000, | ||||
| 		"int.sumobserver//R=V":      10, | ||||
| 		"int.sumobserver/A=B/R=V":   100, | ||||
|  | ||||
| 		"int.updownsumobserver/A=B/R=V":   -100, | ||||
| 		"float.updownsumobserver/A=B/R=V": -1000, | ||||
| 		"int.updownsumobserver//R=V":      10, | ||||
| 		"float.updownsumobserver/C=D/R=V": -1, | ||||
|  | ||||
| 		"float.valueobserver/A=B/R=V": -1, | ||||
| 		"float.valueobserver/C=D/R=V": -1, | ||||
| 		"int.valueobserver//R=V":      1, | ||||
| @@ -515,6 +546,42 @@ func TestRecordPersistence(t *testing.T) { | ||||
| 	require.Equal(t, int64(2), integrator.newAggCount) | ||||
| } | ||||
|  | ||||
| func TestIncorrectInstruments(t *testing.T) { | ||||
| 	// The Batch observe/record APIs are susceptible to | ||||
| 	// uninitialized instruments. | ||||
| 	var counter metric.Int64Counter | ||||
| 	var observer metric.Int64ValueObserver | ||||
|  | ||||
| 	ctx := context.Background() | ||||
| 	meter, sdk, integrator := newSDK(t) | ||||
|  | ||||
| 	// Now try with uninitialized instruments. | ||||
| 	meter.RecordBatch(ctx, nil, counter.Measurement(1)) | ||||
| 	meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) { | ||||
| 		result.Observe(nil, observer.Observation(1)) | ||||
| 	}) | ||||
|  | ||||
| 	collected := sdk.Collect(ctx) | ||||
| 	require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr()) | ||||
| 	require.Equal(t, 0, collected) | ||||
|  | ||||
| 	// Now try with instruments from another SDK. | ||||
| 	var noopMeter metric.Meter | ||||
| 	counter = metric.Must(noopMeter).NewInt64Counter("counter") | ||||
| 	observer = metric.Must(noopMeter).NewBatchObserver( | ||||
| 		func(context.Context, metric.BatchObserverResult) {}, | ||||
| 	).RegisterInt64ValueObserver("observer") | ||||
|  | ||||
| 	meter.RecordBatch(ctx, nil, counter.Measurement(1)) | ||||
| 	meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) { | ||||
| 		result.Observe(nil, observer.Observation(1)) | ||||
| 	}) | ||||
|  | ||||
| 	collected = sdk.Collect(ctx) | ||||
| 	require.Equal(t, 0, collected) | ||||
| 	require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr()) | ||||
| } | ||||
|  | ||||
| func TestSyncInAsync(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
| 	meter, sdk, integrator := newSDK(t) | ||||
|   | ||||
| @@ -152,6 +152,8 @@ var ( | ||||
| 	_ api.AsyncImpl     = &asyncInstrument{} | ||||
| 	_ api.SyncImpl      = &syncInstrument{} | ||||
| 	_ api.BoundSyncImpl = &record{} | ||||
|  | ||||
| 	ErrUninitializedInstrument = fmt.Errorf("use of an uninitialized instrument") | ||||
| ) | ||||
|  | ||||
| func (inst *instrument) Descriptor() api.Descriptor { | ||||
| @@ -422,8 +424,9 @@ func (m *Accumulator) CollectAsync(kv []kv.KeyValue, obs ...metric.Observation) | ||||
| 	labels := label.NewSetWithSortable(kv, &m.asyncSortSlice) | ||||
|  | ||||
| 	for _, ob := range obs { | ||||
| 		a := ob.AsyncImpl().Implementation().(*asyncInstrument) | ||||
| 		a.observe(ob.Number(), &labels) | ||||
| 		if a := m.fromAsync(ob.AsyncImpl()); a != nil { | ||||
| 			a.observe(ob.Number(), &labels) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -438,8 +441,9 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int { | ||||
| 	m.asyncContext = nil | ||||
|  | ||||
| 	for _, inst := range m.asyncInstruments.Instruments() { | ||||
| 		a := inst.Implementation().(*asyncInstrument) | ||||
| 		asyncCollected += m.checkpointAsync(a) | ||||
| 		if a := m.fromAsync(inst); a != nil { | ||||
| 			asyncCollected += m.checkpointAsync(a) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return asyncCollected | ||||
| @@ -494,8 +498,10 @@ func (m *Accumulator) RecordBatch(ctx context.Context, kvs []kv.KeyValue, measur | ||||
| 	// ordered labels. | ||||
| 	var labelsPtr *label.Set | ||||
| 	for i, meas := range measurements { | ||||
| 		s := meas.SyncImpl().Implementation().(*syncInstrument) | ||||
|  | ||||
| 		s := m.fromSync(meas.SyncImpl()) | ||||
| 		if s == nil { | ||||
| 			continue | ||||
| 		} | ||||
| 		h := s.acquireHandle(kvs, labelsPtr) | ||||
|  | ||||
| 		// Re-use labels for the next measurement. | ||||
| @@ -538,3 +544,27 @@ func (r *record) mapkey() mapkey { | ||||
| 		ordered:    r.labels.Equivalent(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // fromSync gets a sync implementation object, checking for | ||||
| // uninitialized instruments and instruments created by another SDK. | ||||
| func (m *Accumulator) fromSync(sync metric.SyncImpl) *syncInstrument { | ||||
| 	if sync != nil { | ||||
| 		if inst, ok := sync.Implementation().(*syncInstrument); ok { | ||||
| 			return inst | ||||
| 		} | ||||
| 	} | ||||
| 	m.errorHandler(ErrUninitializedInstrument) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // fromSync gets an async implementation object, checking for | ||||
| // uninitialized instruments and instruments created by another SDK. | ||||
| func (m *Accumulator) fromAsync(async metric.AsyncImpl) *asyncInstrument { | ||||
| 	if async != nil { | ||||
| 		if inst, ok := async.Implementation().(*asyncInstrument); ok { | ||||
| 			return inst | ||||
| 		} | ||||
| 	} | ||||
| 	m.errorHandler(ErrUninitializedInstrument) | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user