You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-29 23:07:45 +02:00
Encapsulate SDK BatchSpanProcessor observability (#7332)
Split from #7316
[Follow
guidelines](a5dcd68ebb/CONTRIBUTING.md (encapsulation))
and move instrumentation into its own type.
### Benchmarks
#### Added `sdk/trace/internal/observ` benchmarks
```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/trace/internal/observ
cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
│ enc-trace-sdk-bsp-obs.out │
│ sec/op │
BSP/Processed-8 0.6394n ± 2%
BSP/ProcessedQueueFull-8 0.6806n ± 3%
BSP/Callback-8 3.591µ ± 12%
geomean 11.60n
│ enc-trace-sdk-bsp-obs.out │
│ B/op │
BSP/Processed-8 0.000 ± 0%
BSP/ProcessedQueueFull-8 0.000 ± 0%
BSP/Callback-8 2.626Ki ± 0%
geomean ¹
¹ summaries must be >0 to compute geomean
│ enc-trace-sdk-bsp-obs.out │
│ allocs/op │
BSP/Processed-8 0.000 ± 0%
BSP/ProcessedQueueFull-8 0.000 ± 0%
BSP/Callback-8 16.00 ± 0%
geomean ¹
¹ summaries must be >0 to compute geomean
```
#### Existing `sdk/trace` benchmarks
None
This commit is contained in:
@@ -6,20 +6,14 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/internal/global"
|
"go.opentelemetry.io/otel/internal/global"
|
||||||
"go.opentelemetry.io/otel/metric"
|
|
||||||
"go.opentelemetry.io/otel/sdk"
|
|
||||||
"go.opentelemetry.io/otel/sdk/internal/env"
|
"go.opentelemetry.io/otel/sdk/internal/env"
|
||||||
"go.opentelemetry.io/otel/sdk/trace/internal/x"
|
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
|
||||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -33,8 +27,6 @@ const (
|
|||||||
DefaultMaxExportBatchSize = 512
|
DefaultMaxExportBatchSize = 512
|
||||||
)
|
)
|
||||||
|
|
||||||
var queueFull = otelconv.ErrorTypeAttr("queue_full")
|
|
||||||
|
|
||||||
// BatchSpanProcessorOption configures a BatchSpanProcessor.
|
// BatchSpanProcessorOption configures a BatchSpanProcessor.
|
||||||
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
|
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
|
||||||
|
|
||||||
@@ -78,10 +70,7 @@ type batchSpanProcessor struct {
|
|||||||
queue chan ReadOnlySpan
|
queue chan ReadOnlySpan
|
||||||
dropped uint32
|
dropped uint32
|
||||||
|
|
||||||
observabilityEnabled bool
|
inst *observ.BSP
|
||||||
callbackRegistration metric.Registration
|
|
||||||
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
|
|
||||||
componentNameAttr attribute.KeyValue
|
|
||||||
|
|
||||||
batch []ReadOnlySpan
|
batch []ReadOnlySpan
|
||||||
batchMutex sync.Mutex
|
batchMutex sync.Mutex
|
||||||
@@ -124,20 +113,15 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
|
|||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if x.Observability.Enabled() {
|
|
||||||
bsp.observabilityEnabled = true
|
|
||||||
bsp.componentNameAttr = componentName()
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
bsp.spansProcessedCounter, bsp.callbackRegistration, err = newBSPObs(
|
bsp.inst, err = observ.NewBSP(
|
||||||
bsp.componentNameAttr,
|
nextProcessorID(),
|
||||||
func() int64 { return int64(len(bsp.queue)) },
|
func() int64 { return int64(len(bsp.queue)) },
|
||||||
int64(bsp.o.MaxQueueSize),
|
int64(bsp.o.MaxQueueSize),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
otel.Handle(err)
|
otel.Handle(err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
bsp.stopWait.Add(1)
|
bsp.stopWait.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@@ -157,51 +141,6 @@ func nextProcessorID() int64 {
|
|||||||
return processorIDCounter.Add(1) - 1
|
return processorIDCounter.Add(1) - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func componentName() attribute.KeyValue {
|
|
||||||
id := nextProcessorID()
|
|
||||||
name := fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, id)
|
|
||||||
return semconv.OTelComponentName(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newBSPObs creates and returns a new set of metrics instruments and a
|
|
||||||
// registration for a BatchSpanProcessor. It is the caller's responsibility
|
|
||||||
// to unregister the registration when it is no longer needed.
|
|
||||||
func newBSPObs(
|
|
||||||
cmpnt attribute.KeyValue,
|
|
||||||
qLen func() int64,
|
|
||||||
qMax int64,
|
|
||||||
) (otelconv.SDKProcessorSpanProcessed, metric.Registration, error) {
|
|
||||||
meter := otel.GetMeterProvider().Meter(
|
|
||||||
obsScopeName,
|
|
||||||
metric.WithInstrumentationVersion(sdk.Version()),
|
|
||||||
metric.WithSchemaURL(semconv.SchemaURL),
|
|
||||||
)
|
|
||||||
|
|
||||||
qCap, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
|
|
||||||
|
|
||||||
qSize, e := otelconv.NewSDKProcessorSpanQueueSize(meter)
|
|
||||||
err = errors.Join(err, e)
|
|
||||||
|
|
||||||
spansProcessed, e := otelconv.NewSDKProcessorSpanProcessed(meter)
|
|
||||||
err = errors.Join(err, e)
|
|
||||||
|
|
||||||
cmpntT := semconv.OTelComponentTypeBatchingSpanProcessor
|
|
||||||
attrs := metric.WithAttributes(cmpnt, cmpntT)
|
|
||||||
|
|
||||||
reg, e := meter.RegisterCallback(
|
|
||||||
func(_ context.Context, o metric.Observer) error {
|
|
||||||
o.ObserveInt64(qSize.Inst(), qLen(), attrs)
|
|
||||||
o.ObserveInt64(qCap.Inst(), qMax, attrs)
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
qSize.Inst(),
|
|
||||||
qCap.Inst(),
|
|
||||||
)
|
|
||||||
err = errors.Join(err, e)
|
|
||||||
|
|
||||||
return spansProcessed, reg, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnStart method does nothing.
|
// OnStart method does nothing.
|
||||||
func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
|
func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
|
||||||
|
|
||||||
@@ -242,8 +181,8 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
err = ctx.Err()
|
err = ctx.Err()
|
||||||
}
|
}
|
||||||
if bsp.observabilityEnabled {
|
if bsp.inst != nil {
|
||||||
err = errors.Join(err, bsp.callbackRegistration.Unregister())
|
err = errors.Join(err, bsp.inst.Shutdown())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
@@ -357,10 +296,8 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
|
|||||||
|
|
||||||
if l := len(bsp.batch); l > 0 {
|
if l := len(bsp.batch); l > 0 {
|
||||||
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
|
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
|
||||||
if bsp.observabilityEnabled {
|
if bsp.inst != nil {
|
||||||
bsp.spansProcessedCounter.Add(ctx, int64(l),
|
bsp.inst.Processed(ctx, int64(l))
|
||||||
bsp.componentNameAttr,
|
|
||||||
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
|
|
||||||
}
|
}
|
||||||
err := bsp.e.ExportSpans(ctx, bsp.batch)
|
err := bsp.e.ExportSpans(ctx, bsp.batch)
|
||||||
|
|
||||||
@@ -470,11 +407,8 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
|
|||||||
case bsp.queue <- sd:
|
case bsp.queue <- sd:
|
||||||
return true
|
return true
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
if bsp.observabilityEnabled {
|
if bsp.inst != nil {
|
||||||
bsp.spansProcessedCounter.Add(ctx, 1,
|
bsp.inst.ProcessedQueueFull(ctx, 1)
|
||||||
bsp.componentNameAttr,
|
|
||||||
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
|
|
||||||
bsp.spansProcessedCounter.AttrErrorType(queueFull))
|
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -490,11 +424,8 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan)
|
|||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
atomic.AddUint32(&bsp.dropped, 1)
|
atomic.AddUint32(&bsp.dropped, 1)
|
||||||
if bsp.observabilityEnabled {
|
if bsp.inst != nil {
|
||||||
bsp.spansProcessedCounter.Add(ctx, 1,
|
bsp.inst.ProcessedQueueFull(ctx, 1)
|
||||||
bsp.componentNameAttr,
|
|
||||||
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
|
|
||||||
bsp.spansProcessedCounter.AttrErrorType(queueFull))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -25,11 +25,14 @@ import (
|
|||||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||||
|
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const componentID = 0
|
||||||
|
|
||||||
type testBatchExporter struct {
|
type testBatchExporter struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
spans []ReadOnlySpan
|
spans []ReadOnlySpan
|
||||||
@@ -693,6 +696,9 @@ func TestBatchSpanProcessorMetricsDisabled(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestBatchSpanProcessorMetrics(t *testing.T) {
|
func TestBatchSpanProcessorMetrics(t *testing.T) {
|
||||||
|
// Reset for deterministic component ID.
|
||||||
|
processorIDCounter.Store(componentID)
|
||||||
|
|
||||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||||
tp := basicTracerProvider(t)
|
tp := basicTracerProvider(t)
|
||||||
reader := sdkmetric.NewManualReader()
|
reader := sdkmetric.NewManualReader()
|
||||||
@@ -710,7 +716,6 @@ func TestBatchSpanProcessorMetrics(t *testing.T) {
|
|||||||
WithMaxQueueSize(2),
|
WithMaxQueueSize(2),
|
||||||
WithMaxExportBatchSize(2),
|
WithMaxExportBatchSize(2),
|
||||||
)
|
)
|
||||||
internalBsp := bsp.(*batchSpanProcessor)
|
|
||||||
tp.RegisterSpanProcessor(bsp)
|
tp.RegisterSpanProcessor(bsp)
|
||||||
|
|
||||||
tr := tp.Tracer("TestBatchSpanProcessorMetrics")
|
tr := tp.Tracer("TestBatchSpanProcessorMetrics")
|
||||||
@@ -719,15 +724,25 @@ func TestBatchSpanProcessorMetrics(t *testing.T) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
assert.NoError(t, me.waitForSpans(ctx, 2))
|
assert.NoError(t, me.waitForSpans(ctx, 2))
|
||||||
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
|
assertObsScopeMetrics(t, reader, expectMetrics{
|
||||||
expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2})
|
queueCapacity: 2,
|
||||||
|
queueSize: 0,
|
||||||
|
successProcessed: 2,
|
||||||
|
})
|
||||||
// Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full.
|
// Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full.
|
||||||
generateSpan(t, tr, testOption{genNumSpans: 3})
|
generateSpan(t, tr, testOption{genNumSpans: 3})
|
||||||
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
|
assertObsScopeMetrics(t, reader, expectMetrics{
|
||||||
expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2})
|
queueCapacity: 2,
|
||||||
|
queueSize: 2,
|
||||||
|
queueFullProcessed: 1,
|
||||||
|
successProcessed: 2,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
|
func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
|
||||||
|
// Reset for deterministic component ID.
|
||||||
|
processorIDCounter.Store(componentID)
|
||||||
|
|
||||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||||
tp := basicTracerProvider(t)
|
tp := basicTracerProvider(t)
|
||||||
reader := sdkmetric.NewManualReader()
|
reader := sdkmetric.NewManualReader()
|
||||||
@@ -747,7 +762,6 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
|
|||||||
WithMaxQueueSize(2),
|
WithMaxQueueSize(2),
|
||||||
WithMaxExportBatchSize(2),
|
WithMaxExportBatchSize(2),
|
||||||
)
|
)
|
||||||
internalBsp := bsp.(*batchSpanProcessor)
|
|
||||||
tp.RegisterSpanProcessor(bsp)
|
tp.RegisterSpanProcessor(bsp)
|
||||||
|
|
||||||
tr := tp.Tracer("TestBatchSpanProcessorBlockingMetrics")
|
tr := tp.Tracer("TestBatchSpanProcessorBlockingMetrics")
|
||||||
@@ -756,23 +770,33 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
assert.NoError(t, me.waitForSpans(ctx, 2))
|
assert.NoError(t, me.waitForSpans(ctx, 2))
|
||||||
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
|
assertObsScopeMetrics(t, reader, expectMetrics{
|
||||||
expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2})
|
queueCapacity: 2,
|
||||||
|
queueSize: 0,
|
||||||
|
successProcessed: 2,
|
||||||
|
})
|
||||||
// Generate 2 spans to fill the queue.
|
// Generate 2 spans to fill the queue.
|
||||||
generateSpan(t, tr, testOption{genNumSpans: 2})
|
generateSpan(t, tr, testOption{genNumSpans: 2})
|
||||||
go func() {
|
go func() {
|
||||||
// Generate a span which blocks because the queue is full.
|
// Generate a span which blocks because the queue is full.
|
||||||
generateSpan(t, tr, testOption{genNumSpans: 1})
|
generateSpan(t, tr, testOption{genNumSpans: 1})
|
||||||
}()
|
}()
|
||||||
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
|
assertObsScopeMetrics(t, reader, expectMetrics{
|
||||||
expectMetrics{queueCapacity: 2, queueSize: 2, successProcessed: 2})
|
queueCapacity: 2,
|
||||||
|
queueSize: 2,
|
||||||
|
successProcessed: 2,
|
||||||
|
})
|
||||||
|
|
||||||
// Use ForceFlush to force the span that is blocking on the full queue to be dropped.
|
// Use ForceFlush to force the span that is blocking on the full queue to be dropped.
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
|
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
assert.Error(t, tp.ForceFlush(ctx))
|
assert.Error(t, tp.ForceFlush(ctx))
|
||||||
assertObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
|
assertObsScopeMetrics(t, reader, expectMetrics{
|
||||||
expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2})
|
queueCapacity: 2,
|
||||||
|
queueSize: 2,
|
||||||
|
queueFullProcessed: 1,
|
||||||
|
successProcessed: 2,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type expectMetrics struct {
|
type expectMetrics struct {
|
||||||
@@ -782,13 +806,16 @@ type expectMetrics struct {
|
|||||||
queueFullProcessed int64
|
queueFullProcessed int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, reader sdkmetric.Reader,
|
func assertObsScopeMetrics(
|
||||||
|
t *testing.T,
|
||||||
|
reader sdkmetric.Reader,
|
||||||
expectation expectMetrics,
|
expectation expectMetrics,
|
||||||
) {
|
) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
gotResourceMetrics := new(metricdata.ResourceMetrics)
|
gotResourceMetrics := new(metricdata.ResourceMetrics)
|
||||||
assert.NoError(t, reader.Collect(context.Background(), gotResourceMetrics))
|
assert.NoError(t, reader.Collect(context.Background(), gotResourceMetrics))
|
||||||
|
|
||||||
|
componentNameAttr := observ.BSPComponentName(componentID)
|
||||||
baseAttrs := attribute.NewSet(
|
baseAttrs := attribute.NewSet(
|
||||||
semconv.OTelComponentTypeBatchingSpanProcessor,
|
semconv.OTelComponentTypeBatchingSpanProcessor,
|
||||||
componentNameAttr,
|
componentNameAttr,
|
||||||
@@ -832,7 +859,7 @@ func assertObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, r
|
|||||||
Attributes: attribute.NewSet(
|
Attributes: attribute.NewSet(
|
||||||
semconv.OTelComponentTypeBatchingSpanProcessor,
|
semconv.OTelComponentTypeBatchingSpanProcessor,
|
||||||
componentNameAttr,
|
componentNameAttr,
|
||||||
semconv.ErrorTypeKey.String(string(queueFull)),
|
observ.ErrQueueFull,
|
||||||
),
|
),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -854,9 +881,9 @@ func assertObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, r
|
|||||||
|
|
||||||
wantScopeMetric := metricdata.ScopeMetrics{
|
wantScopeMetric := metricdata.ScopeMetrics{
|
||||||
Scope: instrumentation.Scope{
|
Scope: instrumentation.Scope{
|
||||||
Name: "go.opentelemetry.io/otel/sdk/trace",
|
Name: observ.ScopeName,
|
||||||
Version: sdk.Version(),
|
Version: sdk.Version(),
|
||||||
SchemaURL: semconv.SchemaURL,
|
SchemaURL: observ.SchemaURL,
|
||||||
},
|
},
|
||||||
Metrics: wantMetrics,
|
Metrics: wantMetrics,
|
||||||
}
|
}
|
||||||
|
|||||||
119
sdk/trace/internal/observ/batch_span_processor.go
Normal file
119
sdk/trace/internal/observ/batch_span_processor.go
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
// Copyright The OpenTelemetry Authors
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package observ // import "go.opentelemetry.io/otel/sdk/trace/internal/observ"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/metric"
|
||||||
|
"go.opentelemetry.io/otel/sdk"
|
||||||
|
"go.opentelemetry.io/otel/sdk/trace/internal/x"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||||
|
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ScopeName is the name of the instrumentation scope.
|
||||||
|
ScopeName = "go.opentelemetry.io/otel/sdk/trace/internal/observ"
|
||||||
|
|
||||||
|
// SchemaURL is the schema URL of the instrumentation.
|
||||||
|
SchemaURL = semconv.SchemaURL
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrQueueFull is the attribute value for the "queue_full" error type.
|
||||||
|
var ErrQueueFull = otelconv.SDKProcessorSpanProcessed{}.AttrErrorType(
|
||||||
|
otelconv.ErrorTypeAttr("queue_full"),
|
||||||
|
)
|
||||||
|
|
||||||
|
// BSPComponentName returns the component name attribute for a
|
||||||
|
// BatchSpanProcessor with the given ID.
|
||||||
|
func BSPComponentName(id int64) attribute.KeyValue {
|
||||||
|
t := otelconv.ComponentTypeBatchingSpanProcessor
|
||||||
|
name := fmt.Sprintf("%s/%d", t, id)
|
||||||
|
return semconv.OTelComponentName(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BSP is the instrumentation for an OTel SDK BatchSpanProcessor.
|
||||||
|
type BSP struct {
|
||||||
|
reg metric.Registration
|
||||||
|
|
||||||
|
processed metric.Int64Counter
|
||||||
|
processedOpts []metric.AddOption
|
||||||
|
processedQueueFullOpts []metric.AddOption
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBSP(id int64, qLen func() int64, qMax int64) (*BSP, error) {
|
||||||
|
if !x.Observability.Enabled() {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
meter := otel.GetMeterProvider().Meter(
|
||||||
|
ScopeName,
|
||||||
|
metric.WithInstrumentationVersion(sdk.Version()),
|
||||||
|
metric.WithSchemaURL(SchemaURL),
|
||||||
|
)
|
||||||
|
|
||||||
|
qCap, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("failed to create BSP queue capacity metric: %w", err)
|
||||||
|
}
|
||||||
|
qCapInst := qCap.Inst()
|
||||||
|
|
||||||
|
qSize, e := otelconv.NewSDKProcessorSpanQueueSize(meter)
|
||||||
|
if e != nil {
|
||||||
|
e := fmt.Errorf("failed to create BSP queue size metric: %w", e)
|
||||||
|
err = errors.Join(err, e)
|
||||||
|
}
|
||||||
|
qSizeInst := qSize.Inst()
|
||||||
|
|
||||||
|
cmpntT := semconv.OTelComponentTypeBatchingSpanProcessor
|
||||||
|
cmpnt := BSPComponentName(id)
|
||||||
|
set := attribute.NewSet(cmpnt, cmpntT)
|
||||||
|
|
||||||
|
obsOpts := []metric.ObserveOption{metric.WithAttributeSet(set)}
|
||||||
|
reg, e := meter.RegisterCallback(
|
||||||
|
func(_ context.Context, o metric.Observer) error {
|
||||||
|
o.ObserveInt64(qSizeInst, qLen(), obsOpts...)
|
||||||
|
o.ObserveInt64(qCapInst, qMax, obsOpts...)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
qSizeInst,
|
||||||
|
qCapInst,
|
||||||
|
)
|
||||||
|
if e != nil {
|
||||||
|
e := fmt.Errorf("failed to register BSP queue size/capacity callback: %w", e)
|
||||||
|
err = errors.Join(err, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
processed, e := otelconv.NewSDKProcessorSpanProcessed(meter)
|
||||||
|
if e != nil {
|
||||||
|
e := fmt.Errorf("failed to create BSP processed spans metric: %w", e)
|
||||||
|
err = errors.Join(err, e)
|
||||||
|
}
|
||||||
|
processedOpts := []metric.AddOption{metric.WithAttributeSet(set)}
|
||||||
|
|
||||||
|
set = attribute.NewSet(cmpnt, cmpntT, ErrQueueFull)
|
||||||
|
processedQueueFullOpts := []metric.AddOption{metric.WithAttributeSet(set)}
|
||||||
|
|
||||||
|
return &BSP{
|
||||||
|
reg: reg,
|
||||||
|
processed: processed.Inst(),
|
||||||
|
processedOpts: processedOpts,
|
||||||
|
processedQueueFullOpts: processedQueueFullOpts,
|
||||||
|
}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BSP) Shutdown() error { return b.reg.Unregister() }
|
||||||
|
|
||||||
|
func (b *BSP) Processed(ctx context.Context, n int64) {
|
||||||
|
b.processed.Add(ctx, n, b.processedOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BSP) ProcessedQueueFull(ctx context.Context, n int64) {
|
||||||
|
b.processed.Add(ctx, n, b.processedQueueFullOpts...)
|
||||||
|
}
|
||||||
223
sdk/trace/internal/observ/batch_span_processor_test.go
Normal file
223
sdk/trace/internal/observ/batch_span_processor_test.go
Normal file
@@ -0,0 +1,223 @@
|
|||||||
|
// Copyright The OpenTelemetry Authors
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package observ_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/metric/noop"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||||
|
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||||
|
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
const id = 0
|
||||||
|
|
||||||
|
func TestBSPComponentName(t *testing.T) {
|
||||||
|
got := observ.BSPComponentName(42)
|
||||||
|
want := semconv.OTelComponentName("batching_span_processor/42")
|
||||||
|
assert.Equal(t, want, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewBSPDisabled(t *testing.T) {
|
||||||
|
// Do not set OTEL_GO_X_OBSERVABILITY
|
||||||
|
bsp, err := observ.NewBSP(id, nil, 0)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Nil(t, bsp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewBSPErrors(t *testing.T) {
|
||||||
|
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||||
|
|
||||||
|
orig := otel.GetMeterProvider()
|
||||||
|
t.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||||
|
|
||||||
|
mp := &errMeterProvider{err: assert.AnError}
|
||||||
|
otel.SetMeterProvider(mp)
|
||||||
|
|
||||||
|
_, err := observ.NewBSP(id, nil, 0)
|
||||||
|
require.ErrorIs(t, err, assert.AnError, "new instrument errors")
|
||||||
|
|
||||||
|
assert.ErrorContains(t, err, "create BSP queue capacity metric")
|
||||||
|
assert.ErrorContains(t, err, "create BSP queue size metric")
|
||||||
|
assert.ErrorContains(t, err, "register BSP queue size/capacity callback")
|
||||||
|
assert.ErrorContains(t, err, "create BSP processed spans metric")
|
||||||
|
}
|
||||||
|
|
||||||
|
func bspSet(attrs ...attribute.KeyValue) attribute.Set {
|
||||||
|
return attribute.NewSet(append([]attribute.KeyValue{
|
||||||
|
semconv.OTelComponentTypeBatchingSpanProcessor,
|
||||||
|
observ.BSPComponentName(id),
|
||||||
|
}, attrs...)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func qCap(v int64) metricdata.Metrics {
|
||||||
|
return metricdata.Metrics{
|
||||||
|
Name: otelconv.SDKProcessorSpanQueueCapacity{}.Name(),
|
||||||
|
Description: otelconv.SDKProcessorSpanQueueCapacity{}.Description(),
|
||||||
|
Unit: otelconv.SDKProcessorSpanQueueCapacity{}.Unit(),
|
||||||
|
Data: metricdata.Sum[int64]{
|
||||||
|
Temporality: metricdata.CumulativeTemporality,
|
||||||
|
IsMonotonic: false,
|
||||||
|
DataPoints: []metricdata.DataPoint[int64]{
|
||||||
|
{Attributes: bspSet(), Value: v},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func qSize(v int64) metricdata.Metrics {
|
||||||
|
return metricdata.Metrics{
|
||||||
|
Name: otelconv.SDKProcessorSpanQueueSize{}.Name(),
|
||||||
|
Description: otelconv.SDKProcessorSpanQueueSize{}.Description(),
|
||||||
|
Unit: otelconv.SDKProcessorSpanQueueSize{}.Unit(),
|
||||||
|
Data: metricdata.Sum[int64]{
|
||||||
|
Temporality: metricdata.CumulativeTemporality,
|
||||||
|
IsMonotonic: false,
|
||||||
|
DataPoints: []metricdata.DataPoint[int64]{
|
||||||
|
{Attributes: bspSet(), Value: v},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBSPCallback(t *testing.T) {
|
||||||
|
collect := setup(t)
|
||||||
|
|
||||||
|
var n int64 = 3
|
||||||
|
bsp, err := observ.NewBSP(id, func() int64 { return n }, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, bsp)
|
||||||
|
|
||||||
|
check(t, collect(), qSize(n), qCap(5))
|
||||||
|
|
||||||
|
n = 4
|
||||||
|
check(t, collect(), qSize(n), qCap(5))
|
||||||
|
|
||||||
|
require.NoError(t, bsp.Shutdown())
|
||||||
|
got := collect()
|
||||||
|
assert.Empty(t, got.Metrics, "no metrics after shutdown")
|
||||||
|
}
|
||||||
|
|
||||||
|
func processed(dPts ...metricdata.DataPoint[int64]) metricdata.Metrics {
|
||||||
|
return metricdata.Metrics{
|
||||||
|
Name: otelconv.SDKProcessorSpanProcessed{}.Name(),
|
||||||
|
Description: otelconv.SDKProcessorSpanProcessed{}.Description(),
|
||||||
|
Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(),
|
||||||
|
Data: metricdata.Sum[int64]{
|
||||||
|
Temporality: metricdata.CumulativeTemporality,
|
||||||
|
IsMonotonic: true,
|
||||||
|
DataPoints: dPts,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBSPProcessed(t *testing.T) {
|
||||||
|
collect := setup(t)
|
||||||
|
|
||||||
|
bsp, err := observ.NewBSP(id, nil, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, bsp)
|
||||||
|
require.NoError(t, bsp.Shutdown()) // Unregister callback.
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
const p0 int64 = 10
|
||||||
|
bsp.Processed(ctx, p0)
|
||||||
|
const e0 int64 = 1
|
||||||
|
bsp.ProcessedQueueFull(ctx, e0)
|
||||||
|
check(t, collect(), processed(
|
||||||
|
dPt(bspSet(), p0),
|
||||||
|
dPt(bspSet(observ.ErrQueueFull), e0),
|
||||||
|
))
|
||||||
|
|
||||||
|
const p1 int64 = 20
|
||||||
|
bsp.Processed(ctx, p1)
|
||||||
|
const e1 int64 = 2
|
||||||
|
bsp.ProcessedQueueFull(ctx, e1)
|
||||||
|
check(t, collect(), processed(
|
||||||
|
dPt(bspSet(), p0+p1),
|
||||||
|
dPt(bspSet(observ.ErrQueueFull), e0+e1),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkBSP(b *testing.B) {
|
||||||
|
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||||
|
|
||||||
|
newBSP := func(b *testing.B) *observ.BSP {
|
||||||
|
b.Helper()
|
||||||
|
bsp, err := observ.NewBSP(id, func() int64 { return 3 }, 5)
|
||||||
|
require.NoError(b, err)
|
||||||
|
require.NotNil(b, bsp)
|
||||||
|
b.Cleanup(func() {
|
||||||
|
if err := bsp.Shutdown(); err != nil {
|
||||||
|
b.Errorf("Shutdown: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return bsp
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
b.Run("Processed", func(b *testing.B) {
|
||||||
|
orig := otel.GetMeterProvider()
|
||||||
|
b.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||||
|
|
||||||
|
// Ensure deterministic benchmark by using noop meter.
|
||||||
|
otel.SetMeterProvider(noop.NewMeterProvider())
|
||||||
|
|
||||||
|
bsp := newBSP(b)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
bsp.Processed(ctx, 10)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
b.Run("ProcessedQueueFull", func(b *testing.B) {
|
||||||
|
orig := otel.GetMeterProvider()
|
||||||
|
b.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||||
|
|
||||||
|
// Ensure deterministic benchmark by using noop meter.
|
||||||
|
otel.SetMeterProvider(noop.NewMeterProvider())
|
||||||
|
|
||||||
|
bsp := newBSP(b)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
bsp.ProcessedQueueFull(ctx, 1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
b.Run("Callback", func(b *testing.B) {
|
||||||
|
orig := otel.GetMeterProvider()
|
||||||
|
b.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||||
|
|
||||||
|
reader := metric.NewManualReader()
|
||||||
|
mp := metric.NewMeterProvider(metric.WithReader(reader))
|
||||||
|
otel.SetMeterProvider(mp)
|
||||||
|
|
||||||
|
bsp := newBSP(b)
|
||||||
|
var got metricdata.ResourceMetrics
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
for b.Loop() {
|
||||||
|
_ = reader.Collect(ctx, &got)
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = got
|
||||||
|
_ = bsp
|
||||||
|
})
|
||||||
|
}
|
||||||
6
sdk/trace/internal/observ/doc.go
Normal file
6
sdk/trace/internal/observ/doc.go
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
// Copyright The OpenTelemetry Authors
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
// Package observ provides observability instrumentation for the OTel trace SDK
|
||||||
|
// package.
|
||||||
|
package observ // import "go.opentelemetry.io/otel/sdk/trace/internal/observ"
|
||||||
99
sdk/trace/internal/observ/observ_test.go
Normal file
99
sdk/trace/internal/observ/observ_test.go
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
// Copyright The OpenTelemetry Authors
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package observ_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
mapi "go.opentelemetry.io/otel/metric"
|
||||||
|
"go.opentelemetry.io/otel/sdk"
|
||||||
|
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||||
|
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setup(t *testing.T) func() metricdata.ScopeMetrics {
|
||||||
|
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||||
|
|
||||||
|
orig := otel.GetMeterProvider()
|
||||||
|
t.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||||
|
|
||||||
|
reader := metric.NewManualReader()
|
||||||
|
mp := metric.NewMeterProvider(metric.WithReader(reader))
|
||||||
|
otel.SetMeterProvider(mp)
|
||||||
|
|
||||||
|
return func() metricdata.ScopeMetrics {
|
||||||
|
var got metricdata.ResourceMetrics
|
||||||
|
require.NoError(t, reader.Collect(context.Background(), &got))
|
||||||
|
if len(got.ScopeMetrics) != 1 {
|
||||||
|
return metricdata.ScopeMetrics{}
|
||||||
|
}
|
||||||
|
return got.ScopeMetrics[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func scopeMetrics(metrics ...metricdata.Metrics) metricdata.ScopeMetrics {
|
||||||
|
return metricdata.ScopeMetrics{
|
||||||
|
Scope: instrumentation.Scope{
|
||||||
|
Name: observ.ScopeName,
|
||||||
|
Version: sdk.Version(),
|
||||||
|
SchemaURL: observ.SchemaURL,
|
||||||
|
},
|
||||||
|
Metrics: metrics,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func check(t *testing.T, got metricdata.ScopeMetrics, want ...metricdata.Metrics) {
|
||||||
|
o := []metricdatatest.Option{
|
||||||
|
metricdatatest.IgnoreTimestamp(),
|
||||||
|
metricdatatest.IgnoreExemplars(),
|
||||||
|
}
|
||||||
|
metricdatatest.AssertEqual(t, scopeMetrics(want...), got, o...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func dPt(set attribute.Set, value int64) metricdata.DataPoint[int64] {
|
||||||
|
return metricdata.DataPoint[int64]{Attributes: set, Value: value}
|
||||||
|
}
|
||||||
|
|
||||||
|
type errMeterProvider struct {
|
||||||
|
mapi.MeterProvider
|
||||||
|
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
|
||||||
|
return &errMeter{err: m.err}
|
||||||
|
}
|
||||||
|
|
||||||
|
type errMeter struct {
|
||||||
|
mapi.Meter
|
||||||
|
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) {
|
||||||
|
return nil, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
|
||||||
|
return nil, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *errMeter) Int64ObservableUpDownCounter(
|
||||||
|
string,
|
||||||
|
...mapi.Int64ObservableUpDownCounterOption,
|
||||||
|
) (mapi.Int64ObservableUpDownCounter, error) {
|
||||||
|
return nil, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *errMeter) RegisterCallback(mapi.Callback, ...mapi.Observable) (mapi.Registration, error) {
|
||||||
|
return nil, m.err
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user