You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-23 22:34:47 +02:00
Add the internal/observ package to otlptracegrpc (#7404)
- Part of #7007 - Contains a TODO tracking features added in #7401 - This package will be used to instrument `otlptracegrpc` in a follow-up PR ### Benchmarks ```terminal goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/observ cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz │ otlptracegrpc-internal-observ.bmark.result │ │ sec/op │ InstrumentationExportSpans/NoError-8 143.8n ± 4% InstrumentationExportSpans/PartialError-8 1.747µ ± 6% InstrumentationExportSpans/FullError-8 1.737µ ± 11% geomean 758.4n │ otlptracegrpc-internal-observ.bmark.result │ │ B/op │ InstrumentationExportSpans/NoError-8 0.000 ± 0% InstrumentationExportSpans/PartialError-8 753.0 ± 0% InstrumentationExportSpans/FullError-8 753.0 ± 0% geomean ¹ ¹ summaries must be >0 to compute geomean │ otlptracegrpc-internal-observ.bmark.result │ │ allocs/op │ InstrumentationExportSpans/NoError-8 0.000 ± 0% InstrumentationExportSpans/PartialError-8 4.000 ± 0% InstrumentationExportSpans/FullError-8 4.000 ± 0% geomean ¹ ¹ summaries must be >0 to compute geomean ``` --------- Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
@@ -8,5 +8,6 @@ https?:\/\/github\.com\/open-telemetry\/semantic-conventions\/archive\/refs\/tag
|
||||
file:///home/runner/work/opentelemetry-go/opentelemetry-go/libraries
|
||||
file:///home/runner/work/opentelemetry-go/opentelemetry-go/manual
|
||||
http://4.3.2.1:78/user/123
|
||||
file:///home/runner/work/opentelemetry-go/opentelemetry-go/exporters/otlp/otlptrace/otlptracegrpc/internal/observ/dns:/:4317
|
||||
# URL works, but it has blocked link checkers.
|
||||
https://dl.acm.org/doi/10.1145/198429.198435
|
||||
|
||||
@@ -29,6 +29,17 @@ func (ps PartialSuccess) Error() string {
|
||||
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
|
||||
}
|
||||
|
||||
// As returns true if ps can be assigned to target and makes the assignment.
|
||||
// Otherwise, it returns false. This supports the errors.As() interface.
|
||||
func (ps PartialSuccess) As(target any) bool {
|
||||
t, ok := target.(*PartialSuccess)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
*t = ps
|
||||
return true
|
||||
}
|
||||
|
||||
// Is supports the errors.Is() interface.
|
||||
func (ps PartialSuccess) Is(err error) bool {
|
||||
_, ok := err.(PartialSuccess)
|
||||
|
||||
@@ -29,6 +29,17 @@ func (ps PartialSuccess) Error() string {
|
||||
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
|
||||
}
|
||||
|
||||
// As returns true if ps can be assigned to target and makes the assignment.
|
||||
// Otherwise, it returns false. This supports the errors.As() interface.
|
||||
func (ps PartialSuccess) As(target any) bool {
|
||||
t, ok := target.(*PartialSuccess)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
*t = ps
|
||||
return true
|
||||
}
|
||||
|
||||
// Is supports the errors.Is() interface.
|
||||
func (ps PartialSuccess) Is(err error) bool {
|
||||
_, ok := err.(PartialSuccess)
|
||||
|
||||
@@ -7,7 +7,9 @@ require (
|
||||
github.com/stretchr/testify v1.11.1
|
||||
go.opentelemetry.io/otel v1.38.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0
|
||||
go.opentelemetry.io/otel/metric v1.38.0
|
||||
go.opentelemetry.io/otel/sdk v1.38.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0
|
||||
go.opentelemetry.io/otel/trace v1.38.0
|
||||
go.opentelemetry.io/proto/otlp v1.8.0
|
||||
go.uber.org/goleak v1.3.0
|
||||
@@ -24,7 +26,6 @@ require (
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.38.0 // indirect
|
||||
golang.org/x/net v0.44.0 // indirect
|
||||
golang.org/x/sys v0.36.0 // indirect
|
||||
golang.org/x/text v0.29.0 // indirect
|
||||
|
||||
@@ -0,0 +1,341 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/observ"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/x"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
const (
|
||||
// ScopeName is the unique name of the meter used for instrumentation.
|
||||
ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/observ"
|
||||
|
||||
// SchemaURL is the schema URL of the metrics produced by this
|
||||
// instrumentation.
|
||||
SchemaURL = semconv.SchemaURL
|
||||
|
||||
// Version is the current version of this instrumentation.
|
||||
//
|
||||
// This matches the version of the exporter.
|
||||
Version = internal.Version
|
||||
)
|
||||
|
||||
var (
|
||||
measureAttrsPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 + // component.name
|
||||
1 + // component.type
|
||||
1 + // server.addr
|
||||
1 + // server.port
|
||||
1 + // error.type
|
||||
1 // rpc.grpc.status_code
|
||||
s := make([]attribute.KeyValue, 0, n)
|
||||
// Return a pointer to a slice instead of a slice itself
|
||||
// to avoid allocations on every call.
|
||||
return &s
|
||||
},
|
||||
}
|
||||
|
||||
addOptPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 // WithAttributeSet
|
||||
o := make([]metric.AddOption, 0, n)
|
||||
return &o
|
||||
},
|
||||
}
|
||||
|
||||
recordOptPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 // WithAttributeSet
|
||||
o := make([]metric.RecordOption, 0, n)
|
||||
return &o
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) }
|
||||
|
||||
func put[T any](p *sync.Pool, s *[]T) {
|
||||
*s = (*s)[:0] // Reset.
|
||||
p.Put(s)
|
||||
}
|
||||
|
||||
// ComponentName returns the component name for the exporter with the
|
||||
// provided ID.
|
||||
func ComponentName(id int64) string {
|
||||
t := semconv.OTelComponentTypeOtlpGRPCSpanExporter.Value.AsString()
|
||||
return fmt.Sprintf("%s/%d", t, id)
|
||||
}
|
||||
|
||||
// Instrumentation is experimental instrumentation for the exporter.
|
||||
type Instrumentation struct {
|
||||
inflightSpans metric.Int64UpDownCounter
|
||||
exportedSpans metric.Int64Counter
|
||||
opDuration metric.Float64Histogram
|
||||
|
||||
attrs []attribute.KeyValue
|
||||
addOpt metric.AddOption
|
||||
recOpt metric.RecordOption
|
||||
}
|
||||
|
||||
// NewInstrumentation returns instrumentation for an OTLP over gPRC trace
|
||||
// exporter with the provided ID using the global MeterProvider.
|
||||
//
|
||||
// The id should be the unique exporter instance ID. It is used
|
||||
// to set the "component.name" attribute.
|
||||
//
|
||||
// The target is the endpoint the exporter is exporting to.
|
||||
//
|
||||
// If the experimental observability is disabled, nil is returned.
|
||||
func NewInstrumentation(id int64, target string) (*Instrumentation, error) {
|
||||
if !x.Observability.Enabled() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
attrs := BaseAttrs(id, target)
|
||||
i := &Instrumentation{
|
||||
attrs: attrs,
|
||||
addOpt: metric.WithAttributeSet(attribute.NewSet(attrs...)),
|
||||
|
||||
// Do not modify attrs (NewSet sorts in-place), make a new slice.
|
||||
recOpt: metric.WithAttributeSet(attribute.NewSet(append(
|
||||
// Default to OK status code.
|
||||
[]attribute.KeyValue{semconv.RPCGRPCStatusCodeOk},
|
||||
attrs...,
|
||||
)...)),
|
||||
}
|
||||
|
||||
mp := otel.GetMeterProvider()
|
||||
m := mp.Meter(
|
||||
ScopeName,
|
||||
metric.WithInstrumentationVersion(Version),
|
||||
metric.WithSchemaURL(SchemaURL),
|
||||
)
|
||||
|
||||
var err error
|
||||
|
||||
inflightSpans, e := otelconv.NewSDKExporterSpanInflight(m)
|
||||
if e != nil {
|
||||
e = fmt.Errorf("failed to create span inflight metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
i.inflightSpans = inflightSpans.Inst()
|
||||
|
||||
exportedSpans, e := otelconv.NewSDKExporterSpanExported(m)
|
||||
if e != nil {
|
||||
e = fmt.Errorf("failed to create span exported metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
i.exportedSpans = exportedSpans.Inst()
|
||||
|
||||
opDuration, e := otelconv.NewSDKExporterOperationDuration(m)
|
||||
if e != nil {
|
||||
e = fmt.Errorf("failed to create operation duration metric: %w", e)
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
i.opDuration = opDuration.Inst()
|
||||
|
||||
return i, err
|
||||
}
|
||||
|
||||
// BaseAttrs returns the base attributes for the exporter with the provided ID
|
||||
// and target.
|
||||
//
|
||||
// The id should be the unique exporter instance ID. It is used
|
||||
// to set the "component.name" attribute.
|
||||
//
|
||||
// The target is the gRPC target the exporter is exporting to. It is expected
|
||||
// to be the output of the Client's CanonicalTarget method.
|
||||
func BaseAttrs(id int64, target string) []attribute.KeyValue {
|
||||
host, port, err := ParseCanonicalTarget(target)
|
||||
if err != nil || (host == "" && port < 0) {
|
||||
if err != nil {
|
||||
global.Debug("failed to parse target", "target", target, "error", err)
|
||||
}
|
||||
return []attribute.KeyValue{
|
||||
semconv.OTelComponentName(ComponentName(id)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
}
|
||||
}
|
||||
|
||||
// Do not use append so the slice is exactly allocated.
|
||||
|
||||
if port < 0 {
|
||||
return []attribute.KeyValue{
|
||||
semconv.OTelComponentName(ComponentName(id)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
semconv.ServerAddress(host),
|
||||
}
|
||||
}
|
||||
|
||||
if host == "" {
|
||||
return []attribute.KeyValue{
|
||||
semconv.OTelComponentName(ComponentName(id)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
semconv.ServerPort(port),
|
||||
}
|
||||
}
|
||||
|
||||
return []attribute.KeyValue{
|
||||
semconv.OTelComponentName(ComponentName(id)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
semconv.ServerAddress(host),
|
||||
semconv.ServerPort(port),
|
||||
}
|
||||
}
|
||||
|
||||
// ExportSpans instruments the ExportSpans method of the exporter. It returns
|
||||
// an [ExportOp] that must have its [ExportOp.End] method called when the
|
||||
// ExportSpans method returns.
|
||||
func (i *Instrumentation) ExportSpans(ctx context.Context, nSpans int) ExportOp {
|
||||
start := time.Now()
|
||||
|
||||
addOpt := get[metric.AddOption](addOptPool)
|
||||
defer put(addOptPool, addOpt)
|
||||
*addOpt = append(*addOpt, i.addOpt)
|
||||
i.inflightSpans.Add(ctx, int64(nSpans), *addOpt...)
|
||||
|
||||
return ExportOp{
|
||||
ctx: ctx,
|
||||
start: start,
|
||||
nSpans: int64(nSpans),
|
||||
inst: i,
|
||||
}
|
||||
}
|
||||
|
||||
// ExportOp tracks the operation being observed by [Instrumentation.ExportSpans].
|
||||
type ExportOp struct {
|
||||
ctx context.Context
|
||||
start time.Time
|
||||
nSpans int64
|
||||
|
||||
inst *Instrumentation
|
||||
}
|
||||
|
||||
// End completes the observation of the operation being observed by a call to
|
||||
// [Instrumentation.ExportSpans].
|
||||
//
|
||||
// Any error that is encountered is provided as err.
|
||||
//
|
||||
// If err is not nil, all spans will be recorded as failures unless error is of
|
||||
// type [internal.PartialSuccess]. In the case of a PartialSuccess, the number
|
||||
// of successfully exported spans will be determined by inspecting the
|
||||
// RejectedItems field of the PartialSuccess.
|
||||
func (e ExportOp) End(err error, code codes.Code) {
|
||||
addOpt := get[metric.AddOption](addOptPool)
|
||||
defer put(addOptPool, addOpt)
|
||||
*addOpt = append(*addOpt, e.inst.addOpt)
|
||||
|
||||
e.inst.inflightSpans.Add(e.ctx, -e.nSpans, *addOpt...)
|
||||
|
||||
success := successful(e.nSpans, err)
|
||||
// Record successfully exported spans, even if the value is 0 which are
|
||||
// meaningful to distribution aggregations.
|
||||
e.inst.exportedSpans.Add(e.ctx, success, *addOpt...)
|
||||
|
||||
if err != nil {
|
||||
attrs := get[attribute.KeyValue](measureAttrsPool)
|
||||
defer put(measureAttrsPool, attrs)
|
||||
*attrs = append(*attrs, e.inst.attrs...)
|
||||
*attrs = append(*attrs, semconv.ErrorType(err))
|
||||
|
||||
// Do not inefficiently make a copy of attrs by using
|
||||
// WithAttributes instead of WithAttributeSet.
|
||||
o := metric.WithAttributeSet(attribute.NewSet(*attrs...))
|
||||
// Reset addOpt with new attribute set.
|
||||
*addOpt = append((*addOpt)[:0], o)
|
||||
|
||||
e.inst.exportedSpans.Add(e.ctx, e.nSpans-success, *addOpt...)
|
||||
}
|
||||
|
||||
recOpt := get[metric.RecordOption](recordOptPool)
|
||||
defer put(recordOptPool, recOpt)
|
||||
*recOpt = append(*recOpt, e.inst.recordOption(err, code))
|
||||
|
||||
d := time.Since(e.start).Seconds()
|
||||
e.inst.opDuration.Record(e.ctx, d, *recOpt...)
|
||||
}
|
||||
|
||||
// recordOption returns a RecordOption with attributes representing the
|
||||
// outcome of the operation being recorded.
|
||||
//
|
||||
// If err is nil and code is codes.OK, the default recOpt of the
|
||||
// Instrumentation is returned.
|
||||
//
|
||||
// If err is not nil or code is not codes.OK, a new RecordOption is returned
|
||||
// with the base attributes of the Instrumentation plus the rpc.grpc.status_code
|
||||
// attribute set to the provided code, and if err is not nil, the error.type
|
||||
// attribute set to the type of the error.
|
||||
func (i *Instrumentation) recordOption(err error, code codes.Code) metric.RecordOption {
|
||||
if err == nil && code == codes.OK {
|
||||
return i.recOpt
|
||||
}
|
||||
|
||||
attrs := get[attribute.KeyValue](measureAttrsPool)
|
||||
defer put(measureAttrsPool, attrs)
|
||||
*attrs = append(*attrs, i.attrs...)
|
||||
|
||||
c := int64(code) // uint32 -> int64.
|
||||
*attrs = append(*attrs, semconv.RPCGRPCStatusCodeKey.Int64(c))
|
||||
if err != nil {
|
||||
*attrs = append(*attrs, semconv.ErrorType(err))
|
||||
}
|
||||
|
||||
// Do not inefficiently make a copy of attrs by using WithAttributes
|
||||
// instead of WithAttributeSet.
|
||||
return metric.WithAttributeSet(attribute.NewSet(*attrs...))
|
||||
}
|
||||
|
||||
// successful returns the number of successfully exported spans out of the n
|
||||
// that were exported based on the provided error.
|
||||
//
|
||||
// If err is nil, n is returned. All spans were successfully exported.
|
||||
//
|
||||
// If err is not nil and not an [internal.PartialSuccess] error, 0 is returned.
|
||||
// It is assumed all spans failed to be exported.
|
||||
//
|
||||
// If err is an [internal.PartialSuccess] error, the number of successfully
|
||||
// exported spans is computed by subtracting the RejectedItems field from n. If
|
||||
// RejectedItems is negative, n is returned. If RejectedItems is greater than
|
||||
// n, 0 is returned.
|
||||
func successful(n int64, err error) int64 {
|
||||
if err == nil {
|
||||
return n // All spans successfully exported.
|
||||
}
|
||||
// Split rejection calculation so successful is inlinable.
|
||||
return n - rejected(n, err)
|
||||
}
|
||||
|
||||
var errPartialPool = &sync.Pool{
|
||||
New: func() any { return new(internal.PartialSuccess) },
|
||||
}
|
||||
|
||||
// rejected returns how many out of the n spans exporter were rejected based on
|
||||
// the provided non-nil err.
|
||||
func rejected(n int64, err error) int64 {
|
||||
ps := errPartialPool.Get().(*internal.PartialSuccess)
|
||||
defer errPartialPool.Put(ps)
|
||||
// Check for partial success.
|
||||
if errors.As(err, ps) {
|
||||
// Bound RejectedItems to [0, n]. This should not be needed,
|
||||
// but be defensive as this is from an external source.
|
||||
return min(max(ps.RejectedItems, 0), n)
|
||||
}
|
||||
return n // All spans rejected.
|
||||
}
|
||||
@@ -0,0 +1,357 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package observ_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/observ"
|
||||
mapi "go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
const (
|
||||
ID = 0
|
||||
ServerAddr = "localhost"
|
||||
ServerPort = 4317
|
||||
)
|
||||
|
||||
var Target = "dns://" + ServerAddr + ":" + strconv.Itoa(ServerPort)
|
||||
|
||||
var Scope = instrumentation.Scope{
|
||||
Name: observ.ScopeName,
|
||||
Version: observ.Version,
|
||||
SchemaURL: observ.SchemaURL,
|
||||
}
|
||||
|
||||
type errMeterProvider struct {
|
||||
mapi.MeterProvider
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
|
||||
return &errMeter{err: m.err}
|
||||
}
|
||||
|
||||
type errMeter struct {
|
||||
mapi.Meter
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func TestNewInstrumentationObservabilityErrors(t *testing.T) {
|
||||
orig := otel.GetMeterProvider()
|
||||
t.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||
mp := &errMeterProvider{err: assert.AnError}
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
|
||||
_, err := observ.NewInstrumentation(ID, Target)
|
||||
require.ErrorIs(t, err, assert.AnError, "new instrument errors")
|
||||
|
||||
assert.ErrorContains(t, err, "inflight metric")
|
||||
assert.ErrorContains(t, err, "span exported metric")
|
||||
assert.ErrorContains(t, err, "operation duration metric")
|
||||
}
|
||||
|
||||
func TestNewInstrumentationObservabilityDisabled(t *testing.T) {
|
||||
// Do not set OTEL_GO_X_OBSERVABILITY.
|
||||
got, err := observ.NewInstrumentation(ID, Target)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, got)
|
||||
}
|
||||
|
||||
func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) {
|
||||
t.Helper()
|
||||
|
||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
|
||||
original := otel.GetMeterProvider()
|
||||
t.Cleanup(func() { otel.SetMeterProvider(original) })
|
||||
|
||||
r := metric.NewManualReader()
|
||||
mp := metric.NewMeterProvider(metric.WithReader(r))
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
inst, err := observ.NewInstrumentation(ID, Target)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, inst)
|
||||
|
||||
return inst, func() metricdata.ScopeMetrics {
|
||||
var rm metricdata.ResourceMetrics
|
||||
require.NoError(t, r.Collect(t.Context(), &rm))
|
||||
|
||||
require.Len(t, rm.ScopeMetrics, 1)
|
||||
return rm.ScopeMetrics[0]
|
||||
}
|
||||
}
|
||||
|
||||
func baseAttrs(err error) []attribute.KeyValue {
|
||||
attrs := []attribute.KeyValue{
|
||||
semconv.OTelComponentName(observ.ComponentName(ID)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
semconv.ServerAddress(ServerAddr),
|
||||
semconv.ServerPort(ServerPort),
|
||||
}
|
||||
if err != nil {
|
||||
attrs = append(attrs, semconv.ErrorType(err))
|
||||
}
|
||||
return attrs
|
||||
}
|
||||
|
||||
func set(err error) attribute.Set {
|
||||
return attribute.NewSet(baseAttrs(err)...)
|
||||
}
|
||||
|
||||
func spanInflight() metricdata.Metrics {
|
||||
return metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterSpanInflight{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanInflight{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanInflight{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{Attributes: set(nil), Value: 0},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func spanExported(success, total int64, err error) metricdata.Metrics {
|
||||
dp := []metricdata.DataPoint[int64]{
|
||||
{Attributes: set(nil), Value: success},
|
||||
}
|
||||
if err != nil {
|
||||
dp = append(dp, metricdata.DataPoint[int64]{
|
||||
Attributes: set(err),
|
||||
Value: total - success,
|
||||
})
|
||||
}
|
||||
return metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterSpanExported{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanExported{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanExported{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: dp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func operationDuration(err error) metricdata.Metrics {
|
||||
rpcSet := func(err error) attribute.Set {
|
||||
c := int64(status.Code(err))
|
||||
return attribute.NewSet(append(
|
||||
[]attribute.KeyValue{
|
||||
semconv.RPCGRPCStatusCodeKey.Int64(c),
|
||||
},
|
||||
baseAttrs(err)...,
|
||||
)...)
|
||||
}
|
||||
return metricdata.Metrics{
|
||||
Name: otelconv.SDKExporterOperationDuration{}.Name(),
|
||||
Description: otelconv.SDKExporterOperationDuration{}.Description(),
|
||||
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
|
||||
Data: metricdata.Histogram[float64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{Attributes: rpcSet(err)},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func assertMetrics(t *testing.T, got metricdata.ScopeMetrics, spans, success int64, err error) {
|
||||
t.Helper()
|
||||
|
||||
assert.Equal(t, Scope, got.Scope, "unexpected scope")
|
||||
|
||||
m := got.Metrics
|
||||
require.Len(t, m, 3, "expected 3 metrics")
|
||||
|
||||
o := metricdatatest.IgnoreTimestamp()
|
||||
want := spanInflight()
|
||||
metricdatatest.AssertEqual(t, want, m[0], o)
|
||||
|
||||
want = spanExported(success, spans, err)
|
||||
metricdatatest.AssertEqual(t, want, m[1], o)
|
||||
|
||||
want = operationDuration(err)
|
||||
metricdatatest.AssertEqual(t, want, m[2], o, metricdatatest.IgnoreValue())
|
||||
}
|
||||
|
||||
func TestInstrumentationExportSpans(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
const n = 10
|
||||
inst.ExportSpans(t.Context(), n).End(nil, codes.OK)
|
||||
|
||||
assertMetrics(t, collect(), n, n, nil)
|
||||
}
|
||||
|
||||
func TestInstrumentationExportSpansAllErrored(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
const n = 10
|
||||
c := codes.PermissionDenied
|
||||
err := status.Error(c, "go away")
|
||||
inst.ExportSpans(t.Context(), n).End(err, c)
|
||||
|
||||
const success = 0
|
||||
assertMetrics(t, collect(), n, success, err)
|
||||
}
|
||||
|
||||
func TestInstrumentationExportSpansPartialErrored(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
const n = 10
|
||||
const success = n - 5
|
||||
|
||||
c := codes.Unavailable
|
||||
err := status.Error(c, "temporary failure")
|
||||
err = errors.Join(err, &internal.PartialSuccess{RejectedItems: 5})
|
||||
inst.ExportSpans(t.Context(), n).End(err, c)
|
||||
|
||||
assertMetrics(t, collect(), n, success, err)
|
||||
}
|
||||
|
||||
func TestInstrumentationExportSpansInvalidPartialErrored(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
const n = 10
|
||||
pErr := &internal.PartialSuccess{RejectedItems: -5}
|
||||
c := codes.Unavailable
|
||||
err := errors.Join(status.Error(c, "temporary"), pErr)
|
||||
inst.ExportSpans(t.Context(), n).End(err, c)
|
||||
|
||||
// Round -5 to 0.
|
||||
success := int64(n) // (n - 0)
|
||||
assertMetrics(t, collect(), n, success, err)
|
||||
|
||||
// Note: the metrics are cumulative, so account for the previous
|
||||
// ExportSpans call.
|
||||
pErr.RejectedItems = n + 5
|
||||
inst.ExportSpans(t.Context(), n).End(err, c)
|
||||
|
||||
// Round n+5 to n.
|
||||
success += 0 // success + (n - n)
|
||||
assertMetrics(t, collect(), n+n, success, err)
|
||||
}
|
||||
|
||||
func TestBaseAttrs(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
target string
|
||||
want []attribute.KeyValue
|
||||
}{
|
||||
{
|
||||
name: "HostAndPort",
|
||||
target: "dns://localhost:4317",
|
||||
want: []attribute.KeyValue{
|
||||
semconv.OTelComponentName(observ.ComponentName(ID)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
semconv.ServerAddress("localhost"),
|
||||
semconv.ServerPort(4317),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Host",
|
||||
target: "dns://localhost",
|
||||
want: []attribute.KeyValue{
|
||||
semconv.OTelComponentName(observ.ComponentName(ID)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
semconv.ServerAddress("localhost"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Port",
|
||||
target: "dns://:4317",
|
||||
want: []attribute.KeyValue{
|
||||
semconv.OTelComponentName(observ.ComponentName(ID)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
semconv.ServerPort(4317),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Empty",
|
||||
target: "",
|
||||
want: []attribute.KeyValue{
|
||||
semconv.OTelComponentName(observ.ComponentName(ID)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Invalid",
|
||||
target: "dns:///:invalid",
|
||||
want: []attribute.KeyValue{
|
||||
semconv.OTelComponentName(observ.ComponentName(ID)),
|
||||
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := observ.BaseAttrs(ID, tt.target)
|
||||
assert.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkInstrumentationExportSpans(b *testing.B) {
|
||||
setup := func(b *testing.B) *observ.Instrumentation {
|
||||
b.Helper()
|
||||
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
inst, err := observ.NewInstrumentation(ID, Target)
|
||||
if err != nil {
|
||||
b.Fatalf("failed to create instrumentation: %v", err)
|
||||
}
|
||||
return inst
|
||||
}
|
||||
|
||||
run := func(err error, c codes.Code) func(*testing.B) {
|
||||
return func(b *testing.B) {
|
||||
inst := setup(b)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
inst.ExportSpans(b.Context(), 10).End(err, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
b.Run("NoError", run(nil, codes.OK))
|
||||
err := &internal.PartialSuccess{RejectedItems: 6}
|
||||
b.Run("PartialError", run(err, codes.Unavailable))
|
||||
b.Run("FullError", run(assert.AnError, codes.Aborted))
|
||||
}
|
||||
@@ -29,6 +29,17 @@ func (ps PartialSuccess) Error() string {
|
||||
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
|
||||
}
|
||||
|
||||
// As returns true if ps can be assigned to target and makes the assignment.
|
||||
// Otherwise, it returns false. This supports the errors.As() interface.
|
||||
func (ps PartialSuccess) As(target any) bool {
|
||||
t, ok := target.(*PartialSuccess)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
*t = ps
|
||||
return true
|
||||
}
|
||||
|
||||
// Is supports the errors.Is() interface.
|
||||
func (ps PartialSuccess) Is(err error) bool {
|
||||
_, ok := err.(PartialSuccess)
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
|
||||
|
||||
// Version is the current release version of the OpenTelemetry OTLP gRPC trace
|
||||
// exporter in use.
|
||||
const Version = "1.38.0"
|
||||
@@ -29,6 +29,17 @@ func (ps PartialSuccess) Error() string {
|
||||
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
|
||||
}
|
||||
|
||||
// As returns true if ps can be assigned to target and makes the assignment.
|
||||
// Otherwise, it returns false. This supports the errors.As() interface.
|
||||
func (ps PartialSuccess) As(target any) bool {
|
||||
t, ok := target.(*PartialSuccess)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
*t = ps
|
||||
return true
|
||||
}
|
||||
|
||||
// Is supports the errors.Is() interface.
|
||||
func (ps PartialSuccess) Is(err error) bool {
|
||||
_, ok := err.(PartialSuccess)
|
||||
|
||||
@@ -29,6 +29,17 @@ func (ps PartialSuccess) Error() string {
|
||||
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
|
||||
}
|
||||
|
||||
// As returns true if ps can be assigned to target and makes the assignment.
|
||||
// Otherwise, it returns false. This supports the errors.As() interface.
|
||||
func (ps PartialSuccess) As(target any) bool {
|
||||
t, ok := target.(*PartialSuccess)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
*t = ps
|
||||
return true
|
||||
}
|
||||
|
||||
// Is supports the errors.Is() interface.
|
||||
func (ps PartialSuccess) Is(err error) bool {
|
||||
_, ok := err.(PartialSuccess)
|
||||
|
||||
@@ -49,7 +49,9 @@ modules:
|
||||
go.opentelemetry.io/otel/exporters/prometheus:
|
||||
version-refs:
|
||||
- ./exporters/prometheus/internal/version.go
|
||||
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc:
|
||||
version-refs:
|
||||
- ./exporters/otlp/otlplog/otlploggrpc/internal/version.go
|
||||
- ./exporters/otlp/otlplog/otlploggrpc/internal/version.go
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc:
|
||||
version-refs:
|
||||
- ./exporters/otlp/otlptrace/otlptracegrpc/internal/version.go
|
||||
|
||||
Reference in New Issue
Block a user