1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-23 22:34:47 +02:00

sdk/trace: self-observability: batch span processor metrics (#6393)

Fixes https://github.com/open-telemetry/opentelemetry-go/issues/7005

Adds `otel.sdk.processor.span.queue.size`,
`otel.sdk.processor.span.queue.capacity`, and
`otel.sdk.processor.span.processed.count` metrics to the trace batch
span processor.

These are defined in
cb11bb9bac/docs/otel/sdk-metrics.md,
and are experimental. Because of this, metrics are behind the
OTEL_GO_X_SELF_OBSERVABILITY feature gate.

Given the feature is experimental, it always uses the global
meterprovider when enabled.

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
David Ashpole
2025-08-11 16:15:00 -04:00
committed by GitHub
parent c5e68b2010
commit fcc3417677
5 changed files with 359 additions and 4 deletions

View File

@@ -6,13 +6,20 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/sdk/trace/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
"go.opentelemetry.io/otel/trace"
)
@@ -26,6 +33,8 @@ const (
DefaultMaxExportBatchSize = 512
)
var queueFull = otelconv.ErrorTypeAttr("queue_full")
// BatchSpanProcessorOption configures a BatchSpanProcessor.
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
@@ -69,6 +78,11 @@ type batchSpanProcessor struct {
queue chan ReadOnlySpan
dropped uint32
selfObservabilityEnabled bool
callbackRegistration metric.Registration
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
componentNameAttr attribute.KeyValue
batch []ReadOnlySpan
batchMutex sync.Mutex
timer *time.Timer
@@ -110,6 +124,8 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
stopCh: make(chan struct{}),
}
bsp.configureSelfObservability()
bsp.stopWait.Add(1)
go func() {
defer bsp.stopWait.Done()
@@ -120,6 +136,55 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
return bsp
}
var processorIDCounter atomic.Int64
// nextProcessorID returns an identifier for this batch span processor,
// starting with 0 and incrementing by 1 each time it is called.
func nextProcessorID() int64 {
return processorIDCounter.Add(1) - 1
}
// configureSelfObservability configures metrics for the batch span processor.
func (bsp *batchSpanProcessor) configureSelfObservability() {
if !x.SelfObservability.Enabled() {
return
}
bsp.selfObservabilityEnabled = true
bsp.componentNameAttr = semconv.OTelComponentName(
fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, nextProcessorID()))
meter := otel.GetMeterProvider().Meter(
selfObsScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
queueCapacityUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
if err != nil {
otel.Handle(err)
}
queueSizeUpDownCounter, err := otelconv.NewSDKProcessorSpanQueueSize(meter)
if err != nil {
otel.Handle(err)
}
bsp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter)
if err != nil {
otel.Handle(err)
}
callabckAttributesOpt := metric.WithAttributes(bsp.componentNameAttr,
semconv.OTelComponentTypeBatchingSpanProcessor)
bsp.callbackRegistration, err = meter.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(queueSizeUpDownCounter.Inst(), int64(len(bsp.queue)), callabckAttributesOpt)
o.ObserveInt64(queueCapacityUpDownCounter.Inst(), int64(bsp.o.MaxQueueSize), callabckAttributesOpt)
return nil
},
queueSizeUpDownCounter.Inst(), queueCapacityUpDownCounter.Inst())
if err != nil {
otel.Handle(err)
}
}
// OnStart method does nothing.
func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
@@ -160,6 +225,9 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
case <-ctx.Done():
err = ctx.Err()
}
if bsp.selfObservabilityEnabled {
err = errors.Join(err, bsp.callbackRegistration.Unregister())
}
})
return err
}
@@ -272,6 +340,11 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, int64(l),
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
}
err := bsp.e.ExportSpans(ctx, bsp.batch)
// A new batch is always created after exporting, even if the batch failed to be exported.
@@ -380,11 +453,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
case bsp.queue <- sd:
return true
case <-ctx.Done():
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, 1,
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
bsp.spansProcessedCounter.AttrErrorType(queueFull))
}
return false
}
}
func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool {
func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
@@ -394,6 +473,12 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, 1,
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
bsp.spansProcessedCounter.AttrErrorType(queueFull))
}
}
return false
}

View File

@@ -8,14 +8,25 @@ import (
"encoding/binary"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/internal/env"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
"go.opentelemetry.io/otel/trace"
)
@@ -633,3 +644,261 @@ func TestBatchSpanProcessorConcurrentSafe(t *testing.T) {
wg.Wait()
}
// Drop metrics not being tested in this test.
var dropSpanMetricsView = sdkmetric.NewView(
sdkmetric.Instrument{
Name: "otel.sdk.span.*",
},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationDrop{}},
)
func TestBatchSpanProcessorMetricsDisabled(t *testing.T) {
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "false")
tp := basicTracerProvider(t)
reader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(reader),
sdkmetric.WithView(dropSpanMetricsView),
)
otel.SetMeterProvider(meterProvider)
me := newBlockingExporter()
t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) })
bsp := NewBatchSpanProcessor(
me,
// Make sure timeout doesn't trigger during the test.
WithBatchTimeout(time.Hour),
WithMaxQueueSize(2),
WithMaxExportBatchSize(2),
)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer("TestBatchSpanProcessorMetricsDisabled")
// Generate 2 spans, which export and block during the export call.
generateSpan(t, tr, testOption{genNumSpans: 2})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
assert.NoError(t, me.waitForSpans(ctx, 2))
// Validate that there are no metrics produced.
gotMetrics := new(metricdata.ResourceMetrics)
assert.NoError(t, reader.Collect(context.Background(), gotMetrics))
require.Empty(t, gotMetrics.ScopeMetrics)
// Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full.
generateSpan(t, tr, testOption{genNumSpans: 3})
// Validate that there are no metrics produced.
gotMetrics = new(metricdata.ResourceMetrics)
assert.NoError(t, reader.Collect(context.Background(), gotMetrics))
require.Empty(t, gotMetrics.ScopeMetrics)
}
func TestBatchSpanProcessorMetrics(t *testing.T) {
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
tp := basicTracerProvider(t)
reader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(reader),
sdkmetric.WithView(dropSpanMetricsView),
)
otel.SetMeterProvider(meterProvider)
me := newBlockingExporter()
t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) })
bsp := NewBatchSpanProcessor(
me,
// Make sure timeout doesn't trigger during the test.
WithBatchTimeout(time.Hour),
WithMaxQueueSize(2),
WithMaxExportBatchSize(2),
)
internalBsp := bsp.(*batchSpanProcessor)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer("TestBatchSpanProcessorMetrics")
// Generate 2 spans, which export and block during the export call.
generateSpan(t, tr, testOption{genNumSpans: 2})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
assert.NoError(t, me.waitForSpans(ctx, 2))
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2})
// Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full.
generateSpan(t, tr, testOption{genNumSpans: 3})
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2})
}
func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
tp := basicTracerProvider(t)
reader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(reader),
sdkmetric.WithView(dropSpanMetricsView),
)
otel.SetMeterProvider(meterProvider)
me := newBlockingExporter()
t.Cleanup(func() { assert.NoError(t, me.Shutdown(context.Background())) })
bsp := NewBatchSpanProcessor(
me,
// Use WithBlocking so we can trigger a queueFull using ForceFlush.
WithBlocking(),
// Make sure timeout doesn't trigger during the test.
WithBatchTimeout(time.Hour),
WithMaxQueueSize(2),
WithMaxExportBatchSize(2),
)
internalBsp := bsp.(*batchSpanProcessor)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer("TestBatchSpanProcessorBlockingMetrics")
// Generate 2 spans that are exported to the exporter, which blocks.
generateSpan(t, tr, testOption{genNumSpans: 2})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
assert.NoError(t, me.waitForSpans(ctx, 2))
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2})
// Generate 2 spans to fill the queue.
generateSpan(t, tr, testOption{genNumSpans: 2})
go func() {
// Generate a span which blocks because the queue is full.
generateSpan(t, tr, testOption{genNumSpans: 1})
}()
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 2, successProcessed: 2})
// Use ForceFlush to force the span that is blocking on the full queue to be dropped.
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
assert.Error(t, tp.ForceFlush(ctx))
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2})
}
type expectMetrics struct {
queueCapacity int64
queueSize int64
successProcessed int64
queueFullProcessed int64
}
func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, reader sdkmetric.Reader,
expectation expectMetrics,
) {
t.Helper()
gotResourceMetrics := new(metricdata.ResourceMetrics)
assert.NoError(t, reader.Collect(context.Background(), gotResourceMetrics))
baseAttrs := attribute.NewSet(
semconv.OTelComponentTypeBatchingSpanProcessor,
componentNameAttr,
)
wantMetrics := []metricdata.Metrics{
{
Name: otelconv.SDKProcessorSpanQueueCapacity{}.Name(),
Description: otelconv.SDKProcessorSpanQueueCapacity{}.Description(),
Unit: otelconv.SDKProcessorSpanQueueCapacity{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueCapacity}},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
},
},
{
Name: otelconv.SDKProcessorSpanQueueSize{}.Name(),
Description: otelconv.SDKProcessorSpanQueueSize{}.Description(),
Unit: otelconv.SDKProcessorSpanQueueSize{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{{Attributes: baseAttrs, Value: expectation.queueSize}},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
},
},
}
wantProcessedDataPoints := []metricdata.DataPoint[int64]{}
if expectation.successProcessed > 0 {
wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{
Value: expectation.successProcessed,
Attributes: attribute.NewSet(
semconv.OTelComponentTypeBatchingSpanProcessor,
componentNameAttr,
),
})
}
if expectation.queueFullProcessed > 0 {
wantProcessedDataPoints = append(wantProcessedDataPoints, metricdata.DataPoint[int64]{
Value: expectation.queueFullProcessed,
Attributes: attribute.NewSet(
semconv.OTelComponentTypeBatchingSpanProcessor,
componentNameAttr,
semconv.ErrorTypeKey.String(string(queueFull)),
),
})
}
if len(wantProcessedDataPoints) > 0 {
wantMetrics = append(wantMetrics,
metricdata.Metrics{
Name: otelconv.SDKProcessorSpanProcessed{}.Name(),
Description: otelconv.SDKProcessorSpanProcessed{}.Description(),
Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: wantProcessedDataPoints,
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
)
}
wantScopeMetric := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/trace",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: wantMetrics,
}
metricdatatest.AssertEqual(t, wantScopeMetric, gotResourceMetrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())
}
// blockingExporter blocks until the exported span is removed from the channel.
type blockingExporter struct {
shutdown chan struct{}
total atomic.Int32
}
func newBlockingExporter() *blockingExporter {
e := &blockingExporter{shutdown: make(chan struct{})}
return e
}
func (e *blockingExporter) Shutdown(ctx context.Context) error {
select {
case <-e.shutdown:
default:
close(e.shutdown)
}
return ctx.Err()
}
func (e *blockingExporter) ExportSpans(ctx context.Context, s []ReadOnlySpan) error {
e.total.Add(int32(len(s)))
<-e.shutdown
return ctx.Err()
}
func (e *blockingExporter) waitForSpans(ctx context.Context, n int32) error {
// Wait for all n spans to reach the export call
for e.total.Load() < n {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for %d spans to be exported", n)
default:
// So the select will not block
}
runtime.Gosched()
}
return nil
}

View File

@@ -20,6 +20,7 @@ import (
const (
defaultTracerName = "go.opentelemetry.io/otel/sdk/tracer"
selfObsScopeName = "go.opentelemetry.io/otel/sdk/trace"
)
// tracerProviderConfig.

View File

@@ -39,7 +39,7 @@ func (tr *tracer) initSelfObservability() {
tr.selfObservabilityEnabled = true
mp := otel.GetMeterProvider()
m := mp.Meter("go.opentelemetry.io/otel/sdk/trace",
m := mp.Meter(selfObsScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL))