1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

feat: sdk/trace: span processed metric for simple span processor (#7374)

Fixes #7004 

This PR adds support for experimental otel.sdk.processor.span.processed
metric in simple span processor.
Definition of metric at:
https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md

Experimental metrics are behind a feature flag:
`OTEL_GO_X_OBSERVABILITY`

<details>
<summary>Observability Implementation Checklist</summary>

## Observability Implementation Checklist
Based on the [project Observability
guidelines](e4ab314112/CONTRIBUTING.md (observability)),
ensure the following are completed:

### Environment Variable Activation
* [x] Observability features are disabled by default
* [x] Features are activated through the `OTEL_GO_X_OBSERVABILITY`
environment variable
* [x] Use consistent pattern with `x.Observability.Enabled()` check [^1]
* [x] Follow established experimental feature pattern [^2][^3]

[^1]:
e4ab314112/exporters/stdout/stdouttrace/internal/observ/instrumentation.go (L101-L103)
[^2]:
e4ab314112/exporters/stdout/stdouttrace/internal/x/x.go
[^3]:
e4ab314112/sdk/internal/x/x.go

### Encapsulation
* [x] Instrumentation is encapsulated within a dedicated `struct` (e.g.,
[`Instrumentation`](e4ab314112/exporters/stdout/stdouttrace/internal/observ/instrumentation.go (L86-L94)))
* [x] Instrumentation is not mixed into the instrumented component
* [x] Instrumentation code is in its own file or package if
complex/reused
* [x] Instrumentation setup doesn't bloat the main component code

### Initialization
* [x] Initialization is only done when observability is enabled
* [x] Setup is explicit and side-effect free
* [x] Return errors from initialization when appropriate
* [x] Use the global Meter provider (e.g., `otel.GetMeterProvider()`)
* [x] Include proper meter configuration with:
  * [x] Instrumentation package name is used for the Meter
* [x] Instrumentation version (e.g.
[`Version`](e4ab314112/exporters/stdout/stdouttrace/internal/observ/instrumentation.go (L40-L43)))
* [x] Schema URL (e.g.
[`SchemaURL`](e4ab314112/exporters/stdout/stdouttrace/internal/observ/instrumentation.go (L36-L38)))

### Performance
* [x] Little to no overhead when observability is disabled
* [x] Expensive operations are only executed when observability is
enabled
* [x] When enabled, instrumentation code paths are optimized to reduce
allocation/computation overhead

#### Attribute and Option Allocation Management
* [x] Use `sync.Pool` for attribute slices and options with dynamic
attributes
* [x] Pool objects are properly reset before returning to pool
* [x] Pools are scoped for maximum efficiency while ensuring correctness

#### Caching
* [x] Static attribute sets known at compile time are pre-computed and
cached
* [x] Common attribute combinations use lookup tables/maps

#### Benchmarking
* [x] Benchmarks provided for all instrumentation code
* [ ] Benchmark scenarios include both enabled and disabled
observability
* [x] Benchmark results show impact on allocs/op, B/op, and ns/op (use
`b.ReportAllocs()` in benchmarks)

### Error Handling and Robustness
* [x] Errors are reported back to caller when possible
* [x] Partial failures are handled gracefully
* [x] Use partially initialized components when available
* [x] Return errors to caller instead of only using `otel.Handle()`
* [x] Use `otel.Handle()` only when component cannot report error to
user

### Context Propagation
* [x] Observability measurements receive the context from the function
being measured (don't break context propagation by using
`context.Background()`)

### Semantic Conventions Compliance
* [x] All metrics follow [OpenTelemetry Semantic
Conventions](5ee549b1ce/docs/otel/sdk-metrics.md)
* [x] Use the
[`otelconv`](https://pkg.go.dev/go.opentelemetry.io/otel@v1.38.0/semconv/v1.37.0/otelconv)
convenience package for metric semantic conventions
* [x] Component names follow semantic conventions
* [x] Use package path scope type as stable identifier for non-standard
components
* [x] Component names are stable unique identifiers
* [x] Use global counter for uniqueness if necessary
* [x] Component ID counter is resettable for deterministic testing

### Testing
* [x] Use deterministic testing with isolated state
* [x] Restore previous state after tests (`t.Cleanup()`)
* [x] Isolate meter provider for testing
* [x] Use `t.Setenv()` for environment variable testing
* [x] Reset component ID counter for deterministic component names
* [x] Test order doesn't affect results
</details>

### Benchmarks

```console
> benchstat bmark.result
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/sdk/trace/internal/observ
cpu: Apple M1 Pro
                             │ bmark.result │
                             │    sec/op    │
SSP/SpanProcessed-8            146.7n ± 15%
SSP/SpanProcessedWithError-8   205.1n ±  3%
geomean                        173.5n

                             │ bmark.result │
                             │     B/op     │
SSP/SpanProcessed-8              280.0 ± 0%
SSP/SpanProcessedWithError-8     408.0 ± 0%
geomean                          338.0

                             │ bmark.result │
                             │  allocs/op   │
SSP/SpanProcessed-8              3.000 ± 0%
SSP/SpanProcessedWithError-8     3.000 ± 0%
geomean                          3.000
```

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Mahendra Bishnoi
2025-10-18 04:27:35 +05:30
committed by GitHub
parent 2e5fdd15ba
commit f7d2882606
5 changed files with 415 additions and 1 deletions

View File

@@ -19,6 +19,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#7459)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486)
- Add experimental observability metrics for simple span processor in `go.opentelemetry.io/otel/sdk/trace`. (#7374)
### Fixed

View File

@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ // import "go.opentelemetry.io/otel/sdk/trace/internal/observ"
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
var measureAttrsPool = sync.Pool{
New: func() any {
// "component.name" + "component.type" + "error.type"
const n = 1 + 1 + 1
s := make([]attribute.KeyValue, 0, n)
// Return a pointer to a slice instead of a slice itself
// to avoid allocations on every call.
return &s
},
}
// SSP is the instrumentation for an OTel SDK SimpleSpanProcessor.
type SSP struct {
spansProcessedCounter metric.Int64Counter
addOpts []metric.AddOption
attrs []attribute.KeyValue
}
// SSPComponentName returns the component name attribute for a
// SimpleSpanProcessor with the given ID.
func SSPComponentName(id int64) attribute.KeyValue {
t := otelconv.ComponentTypeSimpleSpanProcessor
name := fmt.Sprintf("%s/%d", t, id)
return semconv.OTelComponentName(name)
}
// NewSSP returns instrumentation for an OTel SDK SimpleSpanProcessor with the
// provided ID.
//
// If the experimental observability is disabled, nil is returned.
func NewSSP(id int64) (*SSP, error) {
if !x.Observability.Enabled() {
return nil, nil
}
meter := otel.GetMeterProvider().Meter(
ScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(SchemaURL),
)
spansProcessedCounter, err := otelconv.NewSDKProcessorSpanProcessed(meter)
if err != nil {
err = fmt.Errorf("failed to create SSP processed spans metric: %w", err)
}
componentName := SSPComponentName(id)
componentType := spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeSimpleSpanProcessor)
attrs := []attribute.KeyValue{componentName, componentType}
addOpts := []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(attrs...))}
return &SSP{
spansProcessedCounter: spansProcessedCounter.Inst(),
addOpts: addOpts,
attrs: attrs,
}, err
}
// SpanProcessed records that a span has been processed by the SimpleSpanProcessor.
// If err is non-nil, it records the processing error as an attribute.
func (ssp *SSP) SpanProcessed(ctx context.Context, err error) {
ssp.spansProcessedCounter.Add(ctx, 1, ssp.addOption(err)...)
}
func (ssp *SSP) addOption(err error) []metric.AddOption {
if err == nil {
return ssp.addOpts
}
attrs := measureAttrsPool.Get().(*[]attribute.KeyValue)
defer func() {
*attrs = (*attrs)[:0] // reset the slice for reuse
measureAttrsPool.Put(attrs)
}()
*attrs = append(*attrs, ssp.attrs...)
*attrs = append(*attrs, semconv.ErrorType(err))
// Do not inefficiently make a copy of attrs by using
// WithAttributes instead of WithAttributeSet.
return []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(*attrs...))}
}

View File

@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ_test
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
)
const sspComponentID = 0
func TestSSPComponentName(t *testing.T) {
got := observ.SSPComponentName(10)
want := semconv.OTelComponentName("simple_span_processor/10")
assert.Equal(t, want, got)
}
func TestNewSSPError(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
mp := &errMeterProvider{err: assert.AnError}
otel.SetMeterProvider(mp)
_, err := observ.NewSSP(sspComponentID)
require.ErrorIs(t, err, assert.AnError, "new instrument errors")
assert.ErrorContains(t, err, "create SSP processed spans metric")
}
func TestNewSSPDisabled(t *testing.T) {
ssp, err := observ.NewSSP(sspComponentID)
assert.NoError(t, err)
assert.Nil(t, ssp)
}
func TestSSPSpanProcessed(t *testing.T) {
ctx := t.Context()
collect := setup(t)
ssp, err := observ.NewSSP(sspComponentID)
assert.NoError(t, err)
ssp.SpanProcessed(ctx, nil)
check(t, collect(), processed(dPt(sspSet(), 1)))
ssp.SpanProcessed(ctx, nil)
ssp.SpanProcessed(ctx, nil)
check(t, collect(), processed(dPt(sspSet(), 3)))
processErr := errors.New("error processing span")
ssp.SpanProcessed(ctx, processErr)
check(t, collect(), processed(
dPt(sspSet(), 3),
dPt(sspSet(semconv.ErrorType(processErr)), 1),
))
}
func BenchmarkSSP(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
newSSP := func(b *testing.B) *observ.SSP {
b.Helper()
ssp, err := observ.NewSSP(sspComponentID)
require.NoError(b, err)
require.NotNil(b, ssp)
return ssp
}
b.Run("SpanProcessed", func(b *testing.B) {
orig := otel.GetMeterProvider()
b.Cleanup(func() {
otel.SetMeterProvider(orig)
})
// Ensure deterministic benchmark by using noop meter.
otel.SetMeterProvider(noop.NewMeterProvider())
ssp := newSSP(b)
ctx := b.Context()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ssp.SpanProcessed(ctx, nil)
}
})
})
b.Run("SpanProcessedWithError", func(b *testing.B) {
orig := otel.GetMeterProvider()
b.Cleanup(func() {
otel.SetMeterProvider(orig)
})
// Ensure deterministic benchmark by using noop meter.
otel.SetMeterProvider(noop.NewMeterProvider())
ssp := newSSP(b)
ctx := b.Context()
processErr := errors.New("error processing span")
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ssp.SpanProcessed(ctx, processErr)
}
})
})
}
func sspSet(attrs ...attribute.KeyValue) attribute.Set {
return attribute.NewSet(append([]attribute.KeyValue{
semconv.OTelComponentTypeSimpleSpanProcessor,
observ.SSPComponentName(sspComponentID),
}, attrs...)...)
}

View File

@@ -6,9 +6,12 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
"go.opentelemetry.io/otel/trace"
)
// simpleSpanProcessor is a SpanProcessor that synchronously sends all
@@ -17,6 +20,8 @@ type simpleSpanProcessor struct {
exporterMu sync.Mutex
exporter SpanExporter
stopOnce sync.Once
inst *observ.SSP
}
var _ SpanProcessor = (*simpleSpanProcessor)(nil)
@@ -33,11 +38,26 @@ func NewSimpleSpanProcessor(exporter SpanExporter) SpanProcessor {
ssp := &simpleSpanProcessor{
exporter: exporter,
}
var err error
ssp.inst, err = observ.NewSSP(nextSimpleProcessorID())
if err != nil {
otel.Handle(err)
}
global.Warn("SimpleSpanProcessor is not recommended for production use, consider using BatchSpanProcessor instead.")
return ssp
}
var simpleProcessorIDCounter atomic.Int64
// nextSimpleProcessorID returns an identifier for this simple span processor,
// starting with 0 and incrementing by 1 each time it is called.
func nextSimpleProcessorID() int64 {
return simpleProcessorIDCounter.Add(1) - 1
}
// OnStart does nothing.
func (*simpleSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
@@ -46,11 +66,20 @@ func (ssp *simpleSpanProcessor) OnEnd(s ReadOnlySpan) {
ssp.exporterMu.Lock()
defer ssp.exporterMu.Unlock()
var err error
if ssp.exporter != nil && s.SpanContext().TraceFlags().IsSampled() {
if err := ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s}); err != nil {
err = ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s})
if err != nil {
otel.Handle(err)
}
}
if ssp.inst != nil {
// Add the span to the context to ensure the metric is recorded
// with the correct span context.
ctx := trace.ContextWithSpanContext(context.Background(), s.SpanContext())
ssp.inst.SpanProcessed(ctx, err)
}
}
// Shutdown shuts down the exporter this SimpleSpanProcessor exports to.

View File

@@ -6,11 +6,23 @@ package trace
import (
"context"
"errors"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
type simpleTestExporter struct {
@@ -34,6 +46,17 @@ func (t *simpleTestExporter) Shutdown(ctx context.Context) error {
}
}
var _ SpanExporter = (*failingTestExporter)(nil)
type failingTestExporter struct {
simpleTestExporter
}
func (f *failingTestExporter) ExportSpans(ctx context.Context, spans []ReadOnlySpan) error {
_ = f.simpleTestExporter.ExportSpans(ctx, spans)
return errors.New("failed to export spans")
}
var _ SpanExporter = (*simpleTestExporter)(nil)
func TestNewSimpleSpanProcessor(t *testing.T) {
@@ -168,3 +191,140 @@ func TestSimpleSpanProcessorShutdownHonorsContextCancel(t *testing.T) {
t.Errorf("SimpleSpanProcessor.Shutdown did not return %v, got %v", want, got)
}
}
func TestSimpleSpanProcessorObservability(t *testing.T) {
tests := []struct {
name string
enabled bool
exporter SpanExporter
assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics)
}{
{
name: "Disabled",
enabled: false,
exporter: &simpleTestExporter{},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Empty(t, rm.ScopeMetrics)
},
},
{
name: "Enabled",
enabled: true,
exporter: &simpleTestExporter{},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]
want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/trace/internal/observ",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKProcessorSpanProcessed{}.Name(),
Description: otelconv.SDKProcessorSpanProcessed{}.Description(),
Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
semconv.OTelComponentName("simple_span_processor/0"),
semconv.OTelComponentTypeKey.String("simple_span_processor"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}
metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
},
},
{
name: "Enabled, Exporter error",
enabled: true,
exporter: &failingTestExporter{
simpleTestExporter: simpleTestExporter{},
},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]
want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/trace/internal/observ",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKProcessorSpanProcessed{}.Name(),
Description: otelconv.SDKProcessorSpanProcessed{}.Description(),
Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
semconv.OTelComponentName("simple_span_processor/0"),
semconv.OTelComponentTypeKey.String("simple_span_processor"),
semconv.ErrorTypeKey.String("*errors.errorString"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}
metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", strconv.FormatBool(test.enabled))
original := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(original) })
r := metric.NewManualReader()
mp := metric.NewMeterProvider(
metric.WithReader(r),
metric.WithView(dropSpanMetricsView),
)
otel.SetMeterProvider(mp)
ssp := NewSimpleSpanProcessor(test.exporter)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(ssp)
startSpan(tp, test.name).End()
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(t.Context(), &rm))
test.assertMetrics(t, rm)
simpleProcessorIDCounter.Store(0) // reset simpleProcessorIDCounter
})
}
}