You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-10-08 23:21:56 +02:00
Encapsulate stdouttrace.Exporter
instrumentation in internal package (#7307)
[Follow
guidelines](a5dcd68ebb/CONTRIBUTING.md (encapsulation)
)
and move instrumentation into its own type.
This commit is contained in:
@@ -8,3 +8,4 @@ nam
|
||||
valu
|
||||
thirdparty
|
||||
addOpt
|
||||
observ
|
||||
|
205
exporters/stdout/stdouttrace/internal/observ/instrumentation.go
Normal file
205
exporters/stdout/stdouttrace/internal/observ/instrumentation.go
Normal file
@@ -0,0 +1,205 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package observ provides experimental observability instrumentation
|
||||
// for the stdout trace exporter.
|
||||
package observ // import "go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/observ"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/x"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
const (
|
||||
// ComponentType uniquely identifies the OpenTelemetry Exporter component
|
||||
// being instrumented.
|
||||
//
|
||||
// The STDOUT trace exporter is not a standardized OTel component type, so
|
||||
// it uses the Go package prefixed type name to ensure uniqueness and
|
||||
// identity.
|
||||
ComponentType = "go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter"
|
||||
|
||||
// ScopeName is the unique name of the meter used for instrumentation.
|
||||
ScopeName = "go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/observ"
|
||||
|
||||
// SchemaURL is the schema URL of the metrics produced by this
|
||||
// instrumentation.
|
||||
SchemaURL = semconv.SchemaURL
|
||||
|
||||
// Version is the current version of this instrumentation.
|
||||
//
|
||||
// This matches the version of the exporter.
|
||||
Version = internal.Version
|
||||
)
|
||||
|
||||
var (
|
||||
measureAttrsPool = &sync.Pool{
|
||||
New: func() any {
|
||||
// "component.name" + "component.type" + "error.type"
|
||||
const n = 1 + 1 + 1
|
||||
s := make([]attribute.KeyValue, 0, n)
|
||||
// Return a pointer to a slice instead of a slice itself
|
||||
// to avoid allocations on every call.
|
||||
return &s
|
||||
},
|
||||
}
|
||||
|
||||
addOptPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 // WithAttributeSet
|
||||
o := make([]metric.AddOption, 0, n)
|
||||
return &o
|
||||
},
|
||||
}
|
||||
|
||||
recordOptPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 // WithAttributeSet
|
||||
o := make([]metric.RecordOption, 0, n)
|
||||
return &o
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) }
|
||||
|
||||
func put[T any](p *sync.Pool, s *[]T) {
|
||||
*s = (*s)[:0] // Reset.
|
||||
p.Put(s)
|
||||
}
|
||||
|
||||
func ComponentName(id int64) string {
|
||||
return fmt.Sprintf("%s/%d", ComponentType, id)
|
||||
}
|
||||
|
||||
// Instrumentation is experimental instrumentation for the exporter.
|
||||
type Instrumentation struct {
|
||||
inflightSpans metric.Int64UpDownCounter
|
||||
exportedSpans metric.Int64Counter
|
||||
opDuration metric.Float64Histogram
|
||||
|
||||
attrs []attribute.KeyValue
|
||||
setOpt metric.MeasurementOption
|
||||
}
|
||||
|
||||
// NewInstrumentation returns instrumentation for a STDOUT trace exporter with
|
||||
// the provided ID using the global MeterProvider.
|
||||
//
|
||||
// If the experimental observability is disabled, nil is returned.
|
||||
func NewInstrumentation(id int64) (*Instrumentation, error) {
|
||||
if !x.SelfObservability.Enabled() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
i := &Instrumentation{
|
||||
attrs: []attribute.KeyValue{
|
||||
semconv.OTelComponentName(ComponentName(id)),
|
||||
semconv.OTelComponentTypeKey.String(ComponentType),
|
||||
},
|
||||
}
|
||||
|
||||
s := attribute.NewSet(i.attrs...)
|
||||
i.setOpt = metric.WithAttributeSet(s)
|
||||
|
||||
mp := otel.GetMeterProvider()
|
||||
m := mp.Meter(
|
||||
ScopeName,
|
||||
metric.WithInstrumentationVersion(Version),
|
||||
metric.WithSchemaURL(SchemaURL),
|
||||
)
|
||||
|
||||
var err error
|
||||
|
||||
inflightSpans, e := otelconv.NewSDKExporterSpanInflight(m)
|
||||
if e != nil {
|
||||
e = fmt.Errorf("failed to create span inflight metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
i.inflightSpans = inflightSpans.Inst()
|
||||
|
||||
exportedSpans, e := otelconv.NewSDKExporterSpanExported(m)
|
||||
if e != nil {
|
||||
e = fmt.Errorf("failed to create span exported metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
i.exportedSpans = exportedSpans.Inst()
|
||||
|
||||
opDuration, e := otelconv.NewSDKExporterOperationDuration(m)
|
||||
if e != nil {
|
||||
e = fmt.Errorf("failed to create operation duration metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
i.opDuration = opDuration.Inst()
|
||||
|
||||
return i, err
|
||||
}
|
||||
|
||||
// ExportSpansDone is a function that is called when a call to an Exporter's
|
||||
// ExportSpans method completes.
|
||||
//
|
||||
// The number of successful exports is provided as success. Any error that is
|
||||
// encountered is provided as err.
|
||||
type ExportSpansDone func(success int64, err error)
|
||||
|
||||
// ExportSpans instruments the ExportSpans method of the exporter. It returns a
|
||||
// function that needs to be deferred so it is called when the method returns.
|
||||
func (i *Instrumentation) ExportSpans(ctx context.Context, nSpans int) ExportSpansDone {
|
||||
start := time.Now()
|
||||
|
||||
addOpt := get[metric.AddOption](addOptPool)
|
||||
defer put(addOptPool, addOpt)
|
||||
*addOpt = append(*addOpt, i.setOpt)
|
||||
i.inflightSpans.Add(ctx, int64(nSpans), *addOpt...)
|
||||
|
||||
return i.end(ctx, start, int64(nSpans))
|
||||
}
|
||||
|
||||
func (i *Instrumentation) end(ctx context.Context, start time.Time, n int64) ExportSpansDone {
|
||||
return func(success int64, err error) {
|
||||
addOpt := get[metric.AddOption](addOptPool)
|
||||
defer put(addOptPool, addOpt)
|
||||
*addOpt = append(*addOpt, i.setOpt)
|
||||
|
||||
i.inflightSpans.Add(ctx, -n, *addOpt...)
|
||||
|
||||
// Record the success and duration of the operation.
|
||||
//
|
||||
// Do not exclude 0 values, as they are valid and indicate no spans
|
||||
// were exported which is meaningful for certain aggregations.
|
||||
i.exportedSpans.Add(ctx, success, *addOpt...)
|
||||
|
||||
mOpt := i.setOpt
|
||||
if err != nil {
|
||||
attrs := get[attribute.KeyValue](measureAttrsPool)
|
||||
defer put(measureAttrsPool, attrs)
|
||||
*attrs = append(*attrs, i.attrs...)
|
||||
*attrs = append(*attrs, semconv.ErrorType(err))
|
||||
|
||||
// Do not inefficiently make a copy of attrs by using
|
||||
// WithAttributes instead of WithAttributeSet.
|
||||
set := attribute.NewSet(*attrs...)
|
||||
mOpt = metric.WithAttributeSet(set)
|
||||
|
||||
// Reset addOpt with new attribute set.
|
||||
*addOpt = append((*addOpt)[:0], mOpt)
|
||||
|
||||
i.exportedSpans.Add(ctx, n-success, *addOpt...)
|
||||
}
|
||||
|
||||
recordOpt := get[metric.RecordOption](recordOptPool)
|
||||
defer put(recordOptPool, recordOpt)
|
||||
*recordOpt = append(*recordOpt, mOpt)
|
||||
i.opDuration.Record(ctx, time.Since(start).Seconds(), *recordOpt...)
|
||||
}
|
||||
}
|
@@ -0,0 +1,239 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package observ_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/observ"
|
||||
mapi "go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
const ID = 0
|
||||
|
||||
var Scope = instrumentation.Scope{
|
||||
Name: observ.ScopeName,
|
||||
Version: observ.Version,
|
||||
SchemaURL: observ.SchemaURL,
|
||||
}
|
||||
|
||||
type errMeterProvider struct {
|
||||
mapi.MeterProvider
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
|
||||
return &errMeter{err: m.err}
|
||||
}
|
||||
|
||||
type errMeter struct {
|
||||
mapi.Meter
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func TestNewInstrumentationObservabiltyErrors(t *testing.T) {
|
||||
orig := otel.GetMeterProvider()
|
||||
t.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||
mp := &errMeterProvider{err: assert.AnError}
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
|
||||
|
||||
_, err := observ.NewInstrumentation(ID)
|
||||
require.ErrorIs(t, err, assert.AnError, "new instrument errors")
|
||||
|
||||
assert.ErrorContains(t, err, "inflight metric")
|
||||
assert.ErrorContains(t, err, "span exported metric")
|
||||
assert.ErrorContains(t, err, "operation duration metric")
|
||||
}
|
||||
|
||||
func TestNewInstrumentationObservabiltyDisabled(t *testing.T) {
|
||||
// Do not set OTEL_GO_X_SELF_OBSERVABILITY.
|
||||
got, err := observ.NewInstrumentation(ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, got)
|
||||
}
|
||||
|
||||
func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) {
|
||||
t.Helper()
|
||||
|
||||
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
|
||||
|
||||
original := otel.GetMeterProvider()
|
||||
t.Cleanup(func() { otel.SetMeterProvider(original) })
|
||||
|
||||
r := metric.NewManualReader()
|
||||
mp := metric.NewMeterProvider(metric.WithReader(r))
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
inst, err := observ.NewInstrumentation(ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, inst)
|
||||
|
||||
return inst, func() metricdata.ScopeMetrics {
|
||||
var rm metricdata.ResourceMetrics
|
||||
require.NoError(t, r.Collect(context.Background(), &rm))
|
||||
|
||||
require.Len(t, rm.ScopeMetrics, 1)
|
||||
return rm.ScopeMetrics[0]
|
||||
}
|
||||
}
|
||||
|
||||
func set(err error) attribute.Set {
|
||||
attrs := []attribute.KeyValue{
|
||||
semconv.OTelComponentName(observ.ComponentName(ID)),
|
||||
semconv.OTelComponentTypeKey.String(observ.ComponentType),
|
||||
}
|
||||
if err != nil {
|
||||
attrs = append(attrs, semconv.ErrorType(err))
|
||||
}
|
||||
return attribute.NewSet(attrs...)
|
||||
}
|
||||
|
||||
func spanInflight() metricdata.Metrics {
|
||||
return metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterSpanInflight{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanInflight{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanInflight{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{Attributes: set(nil), Value: 0},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func spanExported(success, total int64, err error) metricdata.Metrics {
|
||||
dp := []metricdata.DataPoint[int64]{
|
||||
{Attributes: set(nil), Value: success},
|
||||
}
|
||||
if err != nil {
|
||||
dp = append(dp, metricdata.DataPoint[int64]{
|
||||
Attributes: set(err),
|
||||
Value: total - success,
|
||||
})
|
||||
}
|
||||
return metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterSpanExported{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanExported{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanExported{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: dp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func operationDuration(err error) metricdata.Metrics {
|
||||
return metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterOperationDuration{}.Name(),
|
||||
Description: otelconv.SDKExporterOperationDuration{}.Description(),
|
||||
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
|
||||
Data: metricdata.Histogram[float64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{Attributes: set(err)},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func assertMetrics(t *testing.T, got metricdata.ScopeMetrics, spans, success int64, err error) {
|
||||
t.Helper()
|
||||
|
||||
assert.Equal(t, Scope, got.Scope, "unexpected scope")
|
||||
|
||||
m := got.Metrics
|
||||
require.Len(t, m, 3, "expected 3 metrics")
|
||||
|
||||
o := metricdatatest.IgnoreTimestamp()
|
||||
want := spanInflight()
|
||||
metricdatatest.AssertEqual(t, want, m[0], o)
|
||||
|
||||
want = spanExported(success, spans, err)
|
||||
metricdatatest.AssertEqual(t, want, m[1], o)
|
||||
|
||||
want = operationDuration(err)
|
||||
metricdatatest.AssertEqual(t, want, m[2], o, metricdatatest.IgnoreValue())
|
||||
}
|
||||
|
||||
func TestInstrumentationExportSpans(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
const n = 10
|
||||
end := inst.ExportSpans(context.Background(), n)
|
||||
end(n, nil)
|
||||
|
||||
assertMetrics(t, collect(), n, n, nil)
|
||||
}
|
||||
|
||||
func TestInstrumentationExportSpansAllErrored(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
const n = 10
|
||||
end := inst.ExportSpans(context.Background(), n)
|
||||
const success = 0
|
||||
end(success, assert.AnError)
|
||||
|
||||
assertMetrics(t, collect(), n, success, assert.AnError)
|
||||
}
|
||||
|
||||
func TestInstrumentationExportSpansPartialErrored(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
const n = 10
|
||||
end := inst.ExportSpans(context.Background(), n)
|
||||
const success = 5
|
||||
end(success, assert.AnError)
|
||||
|
||||
assertMetrics(t, collect(), n, success, assert.AnError)
|
||||
}
|
||||
|
||||
func BenchmarkInstrumentationExportSpans(b *testing.B) {
|
||||
b.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
|
||||
inst, err := observ.NewInstrumentation(ID)
|
||||
if err != nil {
|
||||
b.Fatalf("failed to create instrumentation: %v", err)
|
||||
}
|
||||
|
||||
var end observ.ExportSpansDone
|
||||
err = errors.New("benchmark error")
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
end = inst.ExportSpans(context.Background(), 10)
|
||||
end(4, err)
|
||||
}
|
||||
_ = end
|
||||
}
|
8
exporters/stdout/stdouttrace/internal/version.go
Normal file
8
exporters/stdout/stdouttrace/internal/version.go
Normal file
@@ -0,0 +1,8 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package internal // import "go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal"
|
||||
|
||||
// Version is the current release version of the OpenTelemetry stdouttrace
|
||||
// exporter in use.
|
||||
const Version = "1.38.0"
|
@@ -11,23 +11,12 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/counter"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/x"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/observ"
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
// otelComponentType is a name identifying the type of the OpenTelemetry
|
||||
// component. It is not a standardized OTel component type, so it uses the
|
||||
// Go package prefixed type name to ensure uniqueness and identity.
|
||||
const otelComponentType = "go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter"
|
||||
|
||||
var zeroTime time.Time
|
||||
|
||||
var _ trace.SpanExporter = &Exporter{}
|
||||
@@ -46,39 +35,8 @@ func New(options ...Option) (*Exporter, error) {
|
||||
timestamps: cfg.Timestamps,
|
||||
}
|
||||
|
||||
if !x.SelfObservability.Enabled() {
|
||||
return exporter, nil
|
||||
}
|
||||
|
||||
exporter.selfObservabilityEnabled = true
|
||||
exporter.selfObservabilityAttrs = []attribute.KeyValue{
|
||||
semconv.OTelComponentName(fmt.Sprintf("%s/%d", otelComponentType, counter.NextExporterID())),
|
||||
semconv.OTelComponentTypeKey.String(otelComponentType),
|
||||
}
|
||||
s := attribute.NewSet(exporter.selfObservabilityAttrs...)
|
||||
exporter.selfObservabilitySetOpt = metric.WithAttributeSet(s)
|
||||
|
||||
mp := otel.GetMeterProvider()
|
||||
m := mp.Meter(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace",
|
||||
metric.WithInstrumentationVersion(sdk.Version()),
|
||||
metric.WithSchemaURL(semconv.SchemaURL),
|
||||
)
|
||||
|
||||
var err, e error
|
||||
if exporter.spanInflightMetric, e = otelconv.NewSDKExporterSpanInflight(m); e != nil {
|
||||
e = fmt.Errorf("failed to create span inflight metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
if exporter.spanExportedMetric, e = otelconv.NewSDKExporterSpanExported(m); e != nil {
|
||||
e = fmt.Errorf("failed to create span exported metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
if exporter.operationDurationMetric, e = otelconv.NewSDKExporterOperationDuration(m); e != nil {
|
||||
e = fmt.Errorf("failed to create operation duration metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
|
||||
var err error
|
||||
exporter.inst, err = observ.NewInstrumentation(counter.NextExporterID())
|
||||
return exporter, err
|
||||
}
|
||||
|
||||
@@ -91,107 +49,15 @@ type Exporter struct {
|
||||
stoppedMu sync.RWMutex
|
||||
stopped bool
|
||||
|
||||
selfObservabilityEnabled bool
|
||||
selfObservabilityAttrs []attribute.KeyValue // selfObservability common attributes
|
||||
selfObservabilitySetOpt metric.MeasurementOption
|
||||
spanInflightMetric otelconv.SDKExporterSpanInflight
|
||||
spanExportedMetric otelconv.SDKExporterSpanExported
|
||||
operationDurationMetric otelconv.SDKExporterOperationDuration
|
||||
inst *observ.Instrumentation
|
||||
}
|
||||
|
||||
var (
|
||||
measureAttrsPool = sync.Pool{
|
||||
New: func() any {
|
||||
// "component.name" + "component.type" + "error.type"
|
||||
const n = 1 + 1 + 1
|
||||
s := make([]attribute.KeyValue, 0, n)
|
||||
// Return a pointer to a slice instead of a slice itself
|
||||
// to avoid allocations on every call.
|
||||
return &s
|
||||
},
|
||||
}
|
||||
|
||||
addOptPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 // WithAttributeSet
|
||||
o := make([]metric.AddOption, 0, n)
|
||||
return &o
|
||||
},
|
||||
}
|
||||
|
||||
recordOptPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 // WithAttributeSet
|
||||
o := make([]metric.RecordOption, 0, n)
|
||||
return &o
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// ExportSpans writes spans in json format to stdout.
|
||||
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) (err error) {
|
||||
var success int64
|
||||
if e.selfObservabilityEnabled {
|
||||
count := int64(len(spans))
|
||||
|
||||
addOpt := addOptPool.Get().(*[]metric.AddOption)
|
||||
defer func() {
|
||||
*addOpt = (*addOpt)[:0]
|
||||
addOptPool.Put(addOpt)
|
||||
}()
|
||||
|
||||
*addOpt = append(*addOpt, e.selfObservabilitySetOpt)
|
||||
|
||||
e.spanInflightMetric.Inst().Add(ctx, count, *addOpt...)
|
||||
defer func(starting time.Time) {
|
||||
e.spanInflightMetric.Inst().Add(ctx, -count, *addOpt...)
|
||||
|
||||
// Record the success and duration of the operation.
|
||||
//
|
||||
// Do not exclude 0 values, as they are valid and indicate no spans
|
||||
// were exported which is meaningful for certain aggregations.
|
||||
e.spanExportedMetric.Inst().Add(ctx, success, *addOpt...)
|
||||
|
||||
mOpt := e.selfObservabilitySetOpt
|
||||
if err != nil {
|
||||
// additional attributes for self-observability,
|
||||
// only spanExportedMetric and operationDurationMetric are supported.
|
||||
attrs := measureAttrsPool.Get().(*[]attribute.KeyValue)
|
||||
defer func() {
|
||||
*attrs = (*attrs)[:0] // reset the slice for reuse
|
||||
measureAttrsPool.Put(attrs)
|
||||
}()
|
||||
*attrs = append(*attrs, e.selfObservabilityAttrs...)
|
||||
*attrs = append(*attrs, semconv.ErrorType(err))
|
||||
|
||||
// Do not inefficiently make a copy of attrs by using
|
||||
// WithAttributes instead of WithAttributeSet.
|
||||
set := attribute.NewSet(*attrs...)
|
||||
mOpt = metric.WithAttributeSet(set)
|
||||
|
||||
// Reset addOpt with new attribute set.
|
||||
*addOpt = append((*addOpt)[:0], mOpt)
|
||||
|
||||
e.spanExportedMetric.Inst().Add(
|
||||
ctx,
|
||||
count-success,
|
||||
*addOpt...,
|
||||
)
|
||||
}
|
||||
|
||||
recordOpt := recordOptPool.Get().(*[]metric.RecordOption)
|
||||
defer func() {
|
||||
*recordOpt = (*recordOpt)[:0]
|
||||
recordOptPool.Put(recordOpt)
|
||||
}()
|
||||
|
||||
*recordOpt = append(*recordOpt, mOpt)
|
||||
e.operationDurationMetric.Inst().Record(
|
||||
ctx,
|
||||
time.Since(starting).Seconds(),
|
||||
*recordOpt...,
|
||||
)
|
||||
}(time.Now())
|
||||
if e.inst != nil {
|
||||
end := e.inst.ExportSpans(ctx, len(spans))
|
||||
defer func() { end(success, err) }()
|
||||
}
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
|
@@ -20,8 +20,7 @@ import (
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/counter"
|
||||
mapi "go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/observ"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
@@ -274,9 +273,9 @@ func TestSelfObservability(t *testing.T) {
|
||||
require.Len(t, sm.Metrics, 3)
|
||||
|
||||
assert.Equal(t, instrumentation.Scope{
|
||||
Name: "go.opentelemetry.io/otel/exporters/stdout/stdouttrace",
|
||||
Version: sdk.Version(),
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
Name: observ.ScopeName,
|
||||
Version: observ.Version,
|
||||
SchemaURL: observ.SchemaURL,
|
||||
}, sm.Scope)
|
||||
|
||||
metricdatatest.AssertEqual(t, metricdata.Metrics{
|
||||
@@ -288,12 +287,8 @@ func TestSelfObservability(t *testing.T) {
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
semconv.OTelComponentName(observ.ComponentName(0)),
|
||||
semconv.OTelComponentTypeKey.String(observ.ComponentType),
|
||||
),
|
||||
Value: 0,
|
||||
},
|
||||
@@ -311,12 +306,8 @@ func TestSelfObservability(t *testing.T) {
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
semconv.OTelComponentName(observ.ComponentName(0)),
|
||||
semconv.OTelComponentTypeKey.String(observ.ComponentType),
|
||||
),
|
||||
Value: 2,
|
||||
},
|
||||
@@ -333,259 +324,8 @@ func TestSelfObservability(t *testing.T) {
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
}, sm.Metrics[2], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Enabled, but ExportSpans returns error",
|
||||
enabled: true,
|
||||
callExportSpans: func(t *testing.T, exporter *stdouttrace.Exporter) {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
err := exporter.ExportSpans(ctx, tracetest.SpanStubs{
|
||||
{Name: "/foo"},
|
||||
{Name: "/bar"},
|
||||
}.Snapshots())
|
||||
require.Error(t, err)
|
||||
},
|
||||
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
|
||||
t.Helper()
|
||||
require.Len(t, rm.ScopeMetrics, 1)
|
||||
|
||||
sm := rm.ScopeMetrics[0]
|
||||
require.Len(t, sm.Metrics, 3)
|
||||
|
||||
assert.Equal(t, instrumentation.Scope{
|
||||
Name: "go.opentelemetry.io/otel/exporters/stdout/stdouttrace",
|
||||
Version: sdk.Version(),
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
}, sm.Scope)
|
||||
|
||||
metricdatatest.AssertEqual(t, metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterSpanInflight{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanInflight{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanInflight{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
),
|
||||
Value: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
|
||||
|
||||
metricdatatest.AssertEqual(t, metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterSpanExported{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanExported{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanExported{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
),
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
semconv.ErrorType(context.Canceled),
|
||||
),
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, sm.Metrics[1], metricdatatest.IgnoreTimestamp())
|
||||
|
||||
metricdatatest.AssertEqual(t, metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterOperationDuration{}.Name(),
|
||||
Description: otelconv.SDKExporterOperationDuration{}.Description(),
|
||||
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
|
||||
Data: metricdata.Histogram[float64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
semconv.ErrorType(context.Canceled),
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
}, sm.Metrics[2], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "PartialExport",
|
||||
enabled: true,
|
||||
callExportSpans: func(t *testing.T, exporter *stdouttrace.Exporter) {
|
||||
t.Helper()
|
||||
|
||||
err := exporter.ExportSpans(context.Background(), tracetest.SpanStubs{
|
||||
{Name: "/foo"},
|
||||
{
|
||||
Name: "JSON encoder cannot marshal math.Inf(1)",
|
||||
Attributes: []attribute.KeyValue{attribute.Float64("", math.Inf(1))},
|
||||
},
|
||||
{Name: "/bar"},
|
||||
}.Snapshots())
|
||||
require.Error(t, err)
|
||||
},
|
||||
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
|
||||
t.Helper()
|
||||
require.Len(t, rm.ScopeMetrics, 1)
|
||||
|
||||
sm := rm.ScopeMetrics[0]
|
||||
require.Len(t, sm.Metrics, 3)
|
||||
|
||||
assert.Equal(t, instrumentation.Scope{
|
||||
Name: "go.opentelemetry.io/otel/exporters/stdout/stdouttrace",
|
||||
Version: sdk.Version(),
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
}, sm.Scope)
|
||||
|
||||
metricdatatest.AssertEqual(t, metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterSpanInflight{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanInflight{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanInflight{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
),
|
||||
Value: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
|
||||
|
||||
require.IsType(t, metricdata.Sum[int64]{}, sm.Metrics[1].Data)
|
||||
sum := sm.Metrics[1].Data.(metricdata.Sum[int64])
|
||||
var found bool
|
||||
for i := range sum.DataPoints {
|
||||
sum.DataPoints[i].Attributes, _ = sum.DataPoints[i].Attributes.Filter(
|
||||
func(kv attribute.KeyValue) bool {
|
||||
if kv.Key == semconv.ErrorTypeKey {
|
||||
found = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
},
|
||||
)
|
||||
}
|
||||
assert.True(t, found, "missing error type attribute in span export metric")
|
||||
sm.Metrics[1].Data = sum
|
||||
|
||||
metricdatatest.AssertEqual(t, metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterSpanExported{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanExported{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanExported{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
),
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
),
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, sm.Metrics[1], metricdatatest.IgnoreTimestamp())
|
||||
|
||||
require.IsType(t, metricdata.Histogram[float64]{}, sm.Metrics[2].Data)
|
||||
hist := sm.Metrics[2].Data.(metricdata.Histogram[float64])
|
||||
require.Len(t, hist.DataPoints, 1)
|
||||
found = false
|
||||
hist.DataPoints[0].Attributes, _ = hist.DataPoints[0].Attributes.Filter(
|
||||
func(kv attribute.KeyValue) bool {
|
||||
if kv.Key == semconv.ErrorTypeKey {
|
||||
found = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
},
|
||||
)
|
||||
assert.True(t, found, "missing error type attribute in operation duration metric")
|
||||
sm.Metrics[2].Data = hist
|
||||
|
||||
metricdatatest.AssertEqual(t, metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterOperationDuration{}.Name(),
|
||||
Description: otelconv.SDKExporterOperationDuration{}.Description(),
|
||||
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
|
||||
Data: metricdata.Histogram[float64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(
|
||||
semconv.OTelComponentName(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter/0",
|
||||
),
|
||||
semconv.OTelComponentTypeKey.String(
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace.Exporter",
|
||||
),
|
||||
semconv.OTelComponentName(observ.ComponentName(0)),
|
||||
semconv.OTelComponentTypeKey.String(observ.ComponentType),
|
||||
),
|
||||
},
|
||||
},
|
||||
@@ -625,49 +365,6 @@ func TestSelfObservability(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type errMeterProvider struct {
|
||||
mapi.MeterProvider
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
|
||||
return &errMeter{err: m.err}
|
||||
}
|
||||
|
||||
type errMeter struct {
|
||||
mapi.Meter
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func TestSelfObservabilityInstrumentErrors(t *testing.T) {
|
||||
orig := otel.GetMeterProvider()
|
||||
t.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||
mp := &errMeterProvider{err: assert.AnError}
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
|
||||
_, err := stdouttrace.New()
|
||||
require.ErrorIs(t, err, assert.AnError, "new instrument errors")
|
||||
|
||||
assert.ErrorContains(t, err, "inflight metric")
|
||||
assert.ErrorContains(t, err, "span exported metric")
|
||||
assert.ErrorContains(t, err, "operation duration metric")
|
||||
}
|
||||
|
||||
func BenchmarkExporterExportSpans(b *testing.B) {
|
||||
ss := tracetest.SpanStubs{
|
||||
{Name: "/foo"},
|
||||
|
@@ -42,3 +42,7 @@ module-sets:
|
||||
excluded-modules:
|
||||
- go.opentelemetry.io/otel/internal/tools
|
||||
- go.opentelemetry.io/otel/trace/internal/telemetry/test
|
||||
modules:
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace:
|
||||
version-refs:
|
||||
- ./exporters/stdout/stdouttrace/internal/version.go
|
||||
|
Reference in New Issue
Block a user