From fcc341767788d40f4d98d353e9f11ba4def556b1 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 11 Aug 2025 16:15:00 -0400 Subject: [PATCH] sdk/trace: self-observability: batch span processor metrics (#6393) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://github.com/open-telemetry/opentelemetry-go/issues/7005 Adds `otel.sdk.processor.span.queue.size`, `otel.sdk.processor.span.queue.capacity`, and `otel.sdk.processor.span.processed.count` metrics to the trace batch span processor. These are defined in https://github.com/open-telemetry/semantic-conventions/blob/cb11bb9bac24f4b0e95ad0f61ce01813d8ceada8/docs/otel/sdk-metrics.md, and are experimental. Because of this, metrics are behind the OTEL_GO_X_SELF_OBSERVABILITY feature gate. Given the feature is experimental, it always uses the global meterprovider when enabled. --------- Co-authored-by: Robert PajÄ…k --- CHANGELOG.md | 4 +- sdk/trace/batch_span_processor.go | 87 +++++++- sdk/trace/batch_span_processor_test.go | 269 +++++++++++++++++++++++++ sdk/trace/provider.go | 1 + sdk/trace/tracer.go | 2 +- 5 files changed, 359 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c99e783f1..0cd7435aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,8 +46,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/otel/semconv/v1.36.0` package. The package contains semantic conventions from the `v1.36.0` version of the OpenTelemetry Semantic Conventions. See the [migration documentation](./semconv/v1.36.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.34.0.`(#7032) -- Add experimental self-observability span metrics in `go.opentelemetry.io/otel/sdk/trace`. - Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027) +- Add experimental self-observability span and batch span processor metrics in `go.opentelemetry.io/otel/sdk/trace`. + Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027, #6393) - Add native histogram exemplar support in `go.opentelemetry.io/otel/exporters/prometheus`. (#6772) - Add experimental self-observability log metrics in `go.opentelemetry.io/otel/sdk/log`. Check the `go.opentelemetry.io/otel/sdk/log/internal/x` package documentation for more information. (#7121) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 83c72bb88..60056cd3f 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -6,13 +6,20 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" "errors" + "fmt" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" "go.opentelemetry.io/otel/sdk/internal/env" + "go.opentelemetry.io/otel/sdk/trace/internal/x" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" "go.opentelemetry.io/otel/trace" ) @@ -26,6 +33,8 @@ const ( DefaultMaxExportBatchSize = 512 ) +var queueFull = otelconv.ErrorTypeAttr("queue_full") + // BatchSpanProcessorOption configures a BatchSpanProcessor. type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) @@ -69,6 +78,11 @@ type batchSpanProcessor struct { queue chan ReadOnlySpan dropped uint32 + selfObservabilityEnabled bool + callbackRegistration metric.Registration + spansProcessedCounter otelconv.SDKProcessorSpanProcessed + componentNameAttr attribute.KeyValue + batch []ReadOnlySpan batchMutex sync.Mutex timer *time.Timer @@ -110,6 +124,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO stopCh: make(chan struct{}), } + bsp.configureSelfObservability() + bsp.stopWait.Add(1) go func() { defer bsp.stopWait.Done() @@ -120,6 +136,55 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO return bsp } +var processorIDCounter atomic.Int64 + +// nextProcessorID returns an identifier for this batch span processor, +// starting with 0 and incrementing by 1 each time it is called. +func nextProcessorID() int64 { + return processorIDCounter.Add(1) - 1 +} + +// configureSelfObservability configures metrics for the batch span processor. +func (bsp *batchSpanProcessor) configureSelfObservability() { + if !x.SelfObservability.Enabled() { + return + } + bsp.selfObservabilityEnabled = true + bsp.componentNameAttr = semconv.OTelComponentName( + fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID())) + meter := otel.GetMeterProvider().Meter( + selfObsScopeName, + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + + queueCapacityUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter) + if err != nil { + otel.Handle(err) + } + queueSizeUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueSize(meter) + if err != nil { + otel.Handle(err) + } + bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter) + if err != nil { + otel.Handle(err) + } + + callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr, + semconv.OTelComponentTypeBatchingSpanProcessor) + bsp.callbackRegistration, err = meter.RegisterCallback( + func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(queueSizeUpDownCounter.Inst(), int64(len(bsp.queue)), callabckAttributesOpt) + o.ObserveInt64(queueCapacityUpDownCounter.Inst(), int64(bsp.o.MaxQueueSize), callabckAttributesOpt) + return nil + }, + queueSizeUpDownCounter.Inst(), queueCapacityUpDownCounter.Inst()) + if err != nil { + otel.Handle(err) + } +} + // OnStart method does nothing. func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {} @@ -160,6 +225,9 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { case <-ctx.Done(): err = ctx.Err() } + if bsp.selfObservabilityEnabled { + err = errors.Join(err, bsp.callbackRegistration.Unregister()) + } }) return err } @@ -272,6 +340,11 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { if l := len(bsp.batch); l > 0 { global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + if bsp.selfObservabilityEnabled { + bsp.spansProcessedCounter.Add(ctx, int64(l), + bsp.componentNameAttr, + bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor)) + } err := bsp.e.ExportSpans(ctx, bsp.batch) // A new batch is always created after exporting, even if the batch failed to be exported. @@ -380,11 +453,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R case bsp.queue <- sd: return true case <-ctx.Done(): + if bsp.selfObservabilityEnabled { + bsp.spansProcessedCounter.Add(ctx, 1, + bsp.componentNameAttr, + bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor), + bsp.spansProcessedCounter.AttrErrorType(queueFull)) + } return false } } -func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool { +func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool { if !sd.SpanContext().IsSampled() { return false } @@ -394,6 +473,12 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b return true default: atomic.AddUint32(&bsp.dropped, 1) + if bsp.selfObservabilityEnabled { + bsp.spansProcessedCounter.Add(ctx, 1, + bsp.componentNameAttr, + bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor), + bsp.spansProcessedCounter.AttrErrorType(queueFull)) + } } return false } diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 020f11774..9520e5f62 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -8,14 +8,25 @@ import ( "encoding/binary" "errors" "fmt" + "runtime" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/internal/env" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" "go.opentelemetry.io/otel/trace" ) @@ -633,3 +644,261 @@ func TestBatchSpanProcessorConcurrentSafe(t *testing.T) { wg.Wait() } + +// Drop metrics not being tested in this test. +var dropSpanMetricsView = sdkmetric.NewView( + sdkmetric.Instrument{ + Name: "otel.sdk.span.*", + }, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationDrop{}}, +) + +func TestBatchSpanProcessorMetricsDisabled(t *testing.T) { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "false") + tp := basicTracerProvider(t) + reader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + sdkmetric.WithView(dropSpanMetricsView), + ) + otel.SetMeterProvider(meterProvider) + me := newBlockingExporter() + t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) + bsp := NewBatchSpanProcessor( + me, + // Make sure timeout doesn't trigger during the test. + WithBatchTimeout(time.Hour), + WithMaxQueueSize(2), + WithMaxExportBatchSize(2), + ) + tp.RegisterSpanProcessor(bsp) + + tr := tp.Tracer("TestBatchSpanProcessorMetricsDisabled") + // Generate 2 spans, which export and block during the export call. + generateSpan(t, tr, testOption{genNumSpans: 2}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + assert.NoError(t, me.waitForSpans(ctx, 2)) + + // Validate that there are no metrics produced. + gotMetrics := new(metricdata.ResourceMetrics) + assert.NoError(t, reader.Collect(context.Background(), gotMetrics)) + require.Empty(t, gotMetrics.ScopeMetrics) + // Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full. + generateSpan(t, tr, testOption{genNumSpans: 3}) + // Validate that there are no metrics produced. + gotMetrics = new(metricdata.ResourceMetrics) + assert.NoError(t, reader.Collect(context.Background(), gotMetrics)) + require.Empty(t, gotMetrics.ScopeMetrics) +} + +func TestBatchSpanProcessorMetrics(t *testing.T) { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + tp := basicTracerProvider(t) + reader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + sdkmetric.WithView(dropSpanMetricsView), + ) + otel.SetMeterProvider(meterProvider) + me := newBlockingExporter() + t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) + bsp := NewBatchSpanProcessor( + me, + // Make sure timeout doesn't trigger during the test. + WithBatchTimeout(time.Hour), + WithMaxQueueSize(2), + WithMaxExportBatchSize(2), + ) + internalBsp := bsp.(*batchSpanProcessor) + tp.RegisterSpanProcessor(bsp) + + tr := tp.Tracer("TestBatchSpanProcessorMetrics") + // Generate 2 spans, which export and block during the export call. + generateSpan(t, tr, testOption{genNumSpans: 2}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + assert.NoError(t, me.waitForSpans(ctx, 2)) + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2}) + // Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full. + generateSpan(t, tr, testOption{genNumSpans: 3}) + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2}) +} + +func TestBatchSpanProcessorBlockingMetrics(t *testing.T) { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + tp := basicTracerProvider(t) + reader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + sdkmetric.WithView(dropSpanMetricsView), + ) + otel.SetMeterProvider(meterProvider) + me := newBlockingExporter() + t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) }) + bsp := NewBatchSpanProcessor( + me, + // Use WithBlocking so we can trigger a queueFull using ForceFlush. + WithBlocking(), + // Make sure timeout doesn't trigger during the test. + WithBatchTimeout(time.Hour), + WithMaxQueueSize(2), + WithMaxExportBatchSize(2), + ) + internalBsp := bsp.(*batchSpanProcessor) + tp.RegisterSpanProcessor(bsp) + + tr := tp.Tracer("TestBatchSpanProcessorBlockingMetrics") + // Generate 2 spans that are exported to the exporter, which blocks. + generateSpan(t, tr, testOption{genNumSpans: 2}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + assert.NoError(t, me.waitForSpans(ctx, 2)) + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2}) + // Generate 2 spans to fill the queue. + generateSpan(t, tr, testOption{genNumSpans: 2}) + go func() { + // Generate a span which blocks because the queue is full. + generateSpan(t, tr, testOption{genNumSpans: 1}) + }() + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 2, successProcessed: 2}) + + // Use ForceFlush to force the span that is blocking on the full queue to be dropped. + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + assert.Error(t, tp.ForceFlush(ctx)) + assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader, + expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2}) +} + +type expectMetrics struct { + queueCapacity int64 + queueSize int64 + successProcessed int64 + queueFullProcessed int64 +} + +func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, reader sdkmetric.Reader, + expectation expectMetrics, +) { + t.Helper() + gotResourceMetrics := new(metricdata.ResourceMetrics) + assert.NoError(t, reader.Collect(context.Background(), gotResourceMetrics)) + + baseAttrs := attribute.NewSet( + semconv.OTelComponentTypeBatchingSpanProcessor, + componentNameAttr, + ) + wantMetrics := []metricdata.Metrics{ + { + Name: otelconv.SDKProcessorSpanQueueCapacity{}.Name(), + Description: otelconv.SDKProcessorSpanQueueCapacity{}.Description(), + Unit: otelconv.SDKProcessorSpanQueueCapacity{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueCapacity}}, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + { + Name: otelconv.SDKProcessorSpanQueueSize{}.Name(), + Description: otelconv.SDKProcessorSpanQueueSize{}.Description(), + Unit: otelconv.SDKProcessorSpanQueueSize{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueSize}}, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + } + + wantProcessedDataPoints := []metricdata.DataPoint[int64]{} + if expectation.successProcessed > 0 { + wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{ + Value: expectation.successProcessed, + Attributes: attribute.NewSet( + semconv.OTelComponentTypeBatchingSpanProcessor, + componentNameAttr, + ), + }) + } + if expectation.queueFullProcessed > 0 { + wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{ + Value: expectation.queueFullProcessed, + Attributes: attribute.NewSet( + semconv.OTelComponentTypeBatchingSpanProcessor, + componentNameAttr, + semconv.ErrorTypeKey.String(string(queueFull)), + ), + }) + } + + if len(wantProcessedDataPoints) > 0 { + wantMetrics = append(wantMetrics, + metricdata.Metrics{ + Name: otelconv.SDKProcessorSpanProcessed{}.Name(), + Description: otelconv.SDKProcessorSpanProcessed{}.Description(), + Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: wantProcessedDataPoints, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + ) + } + + wantScopeMetric := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/sdk/trace", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: wantMetrics, + } + metricdatatest.AssertEqual(t, wantScopeMetric, gotResourceMetrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) +} + +// blockingExporter blocks until the exported span is removed from the channel. +type blockingExporter struct { + shutdown chan struct{} + total atomic.Int32 +} + +func newBlockingExporter() *blockingExporter { + e := &blockingExporter{shutdown: make(chan struct{})} + return e +} + +func (e *blockingExporter) Shutdown(ctx context.Context) error { + select { + case <-e.shutdown: + default: + close(e.shutdown) + } + return ctx.Err() +} + +func (e *blockingExporter) ExportSpans(ctx context.Context, s []ReadOnlySpan) error { + e.total.Add(int32(len(s))) + <-e.shutdown + return ctx.Err() +} + +func (e *blockingExporter) waitForSpans(ctx context.Context, n int32) error { + // Wait for all n spans to reach the export call + for e.total.Load() < n { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for %d spans to be exported", n) + default: + // So the select will not block + } + runtime.Gosched() + } + return nil +} diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index fd942d23e..62e87fd63 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -20,6 +20,7 @@ import ( const ( defaultTracerName = "go.opentelemetry.io/otel/sdk/tracer" + selfObsScopeName = "go.opentelemetry.io/otel/sdk/trace" ) // tracerProviderConfig. diff --git a/sdk/trace/tracer.go b/sdk/trace/tracer.go index ddb72cb77..9d0ff05a3 100644 --- a/sdk/trace/tracer.go +++ b/sdk/trace/tracer.go @@ -39,7 +39,7 @@ func (tr *tracer) initSelfObservability() { tr.selfObservabilityEnabled = true mp := otel.GetMeterProvider() - m := mp.Meter("go.opentelemetry.io/otel/sdk/trace", + m := mp.Meter(selfObsScopeName, metric.WithInstrumentationVersion(sdk.Version()), metric.WithSchemaURL(semconv.SchemaURL))