You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-09-16 09:26:25 +02:00
Refactor BSP observability setup (#7264)
Do not rely on side-effects from the configureSelfObservability method. Instead, initialize with the new pure newBSPObs func.
This commit is contained in:
@@ -124,7 +124,20 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
|
|||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
bsp.configureSelfObservability()
|
if x.SelfObservability.Enabled() {
|
||||||
|
bsp.selfObservabilityEnabled = true
|
||||||
|
bsp.componentNameAttr = componentName()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
bsp.spansProcessedCounter, bsp.callbackRegistration, err = newBSPObs(
|
||||||
|
bsp.componentNameAttr,
|
||||||
|
func() int64 { return int64(len(bsp.queue)) },
|
||||||
|
int64(bsp.o.MaxQueueSize),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
otel.Handle(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bsp.stopWait.Add(1)
|
bsp.stopWait.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@@ -144,45 +157,49 @@ func nextProcessorID() int64 {
|
|||||||
return processorIDCounter.Add(1) - 1
|
return processorIDCounter.Add(1) - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// configureSelfObservability configures metrics for the batch span processor.
|
func componentName() attribute.KeyValue {
|
||||||
func (bsp *batchSpanProcessor) configureSelfObservability() {
|
id := nextProcessorID()
|
||||||
if !x.SelfObservability.Enabled() {
|
name := fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, id)
|
||||||
return
|
return semconv.OTelComponentName(name)
|
||||||
}
|
}
|
||||||
bsp.selfObservabilityEnabled = true
|
|
||||||
bsp.componentNameAttr = semconv.OTelComponentName(
|
// newBSPObs creates and returns a new set of metrics instruments and a
|
||||||
fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID()))
|
// registration for a BatchSpanProcessor. It is the caller's responsibility
|
||||||
|
// to unregister the registration when it is no longer needed.
|
||||||
|
func newBSPObs(
|
||||||
|
cmpnt attribute.KeyValue,
|
||||||
|
qLen func() int64,
|
||||||
|
qMax int64,
|
||||||
|
) (otelconv.SDKProcessorSpanProcessed, metric.Registration, error) {
|
||||||
meter := otel.GetMeterProvider().Meter(
|
meter := otel.GetMeterProvider().Meter(
|
||||||
selfObsScopeName,
|
selfObsScopeName,
|
||||||
metric.WithInstrumentationVersion(sdk.Version()),
|
metric.WithInstrumentationVersion(sdk.Version()),
|
||||||
metric.WithSchemaURL(semconv.SchemaURL),
|
metric.WithSchemaURL(semconv.SchemaURL),
|
||||||
)
|
)
|
||||||
|
|
||||||
queueCapacityUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
|
qCap, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
|
||||||
if err != nil {
|
|
||||||
otel.Handle(err)
|
|
||||||
}
|
|
||||||
queueSizeUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueSize(meter)
|
|
||||||
if err != nil {
|
|
||||||
otel.Handle(err)
|
|
||||||
}
|
|
||||||
bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter)
|
|
||||||
if err != nil {
|
|
||||||
otel.Handle(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
callbackAttributesOpt := metric.WithAttributes(bsp.componentNameAttr,
|
qSize, e := otelconv.NewSDKProcessorSpanQueueSize(meter)
|
||||||
semconv.OTelComponentTypeBatchingSpanProcessor)
|
err = errors.Join(err, e)
|
||||||
bsp.callbackRegistration, err = meter.RegisterCallback(
|
|
||||||
|
spansProcessed, e := otelconv.NewSDKProcessorSpanProcessed(meter)
|
||||||
|
err = errors.Join(err, e)
|
||||||
|
|
||||||
|
cmpntT := semconv.OTelComponentTypeBatchingSpanProcessor
|
||||||
|
attrs := metric.WithAttributes(cmpnt, cmpntT)
|
||||||
|
|
||||||
|
reg, e := meter.RegisterCallback(
|
||||||
func(_ context.Context, o metric.Observer) error {
|
func(_ context.Context, o metric.Observer) error {
|
||||||
o.ObserveInt64(queueSizeUpDownCounter.Inst(), int64(len(bsp.queue)), callbackAttributesOpt)
|
o.ObserveInt64(qSize.Inst(), qLen(), attrs)
|
||||||
o.ObserveInt64(queueCapacityUpDownCounter.Inst(), int64(bsp.o.MaxQueueSize), callbackAttributesOpt)
|
o.ObserveInt64(qCap.Inst(), qMax, attrs)
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
queueSizeUpDownCounter.Inst(), queueCapacityUpDownCounter.Inst())
|
qSize.Inst(),
|
||||||
if err != nil {
|
qCap.Inst(),
|
||||||
otel.Handle(err)
|
)
|
||||||
}
|
err = errors.Join(err, e)
|
||||||
|
|
||||||
|
return spansProcessed, reg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnStart method does nothing.
|
// OnStart method does nothing.
|
||||||
|
Reference in New Issue
Block a user