You've already forked opentelemetry-go
							
							
				mirror of
				https://github.com/open-telemetry/opentelemetry-go.git
				synced 2025-10-31 00:07:40 +02:00 
			
		
		
		
	Add the internal/observ pkg to otlptracehttp (#7480)
				
					
				
			Part of #7006 Contains TODOs to be resolved based on merge order of this and #7479. ### Benchmarks ```console > benchstat bmark.results goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/observ cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz │ bmark.results │ │ sec/op │ InstrumentationExportSpans/NoError-8 149.7n ± 3% InstrumentationExportSpans/PartialError-8 1.774µ ± 9% InstrumentationExportSpans/FullError-8 1.743µ ± 4% geomean 773.3n │ bmark.results │ │ B/op │ InstrumentationExportSpans/NoError-8 0.000 ± 0% InstrumentationExportSpans/PartialError-8 753.0 ± 0% InstrumentationExportSpans/FullError-8 753.0 ± 0% geomean ¹ ¹ summaries must be >0 to compute geomean │ bmark.results │ │ allocs/op │ InstrumentationExportSpans/NoError-8 0.000 ± 0% InstrumentationExportSpans/PartialError-8 4.000 ± 0% InstrumentationExportSpans/FullError-8 4.000 ± 0% geomean ¹ ¹ summaries must be >0 to compute geomean ```
This commit is contained in:
		| @@ -4,10 +4,13 @@ go 1.24.0 | ||||
|  | ||||
| require ( | ||||
| 	github.com/cenkalti/backoff/v5 v5.0.3 | ||||
| 	github.com/go-logr/logr v1.4.3 | ||||
| 	github.com/stretchr/testify v1.11.1 | ||||
| 	go.opentelemetry.io/otel v1.38.0 | ||||
| 	go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 | ||||
| 	go.opentelemetry.io/otel/metric v1.38.0 | ||||
| 	go.opentelemetry.io/otel/sdk v1.38.0 | ||||
| 	go.opentelemetry.io/otel/sdk/metric v1.38.0 | ||||
| 	go.opentelemetry.io/otel/trace v1.38.0 | ||||
| 	go.opentelemetry.io/proto/otlp v1.8.0 | ||||
| 	google.golang.org/grpc v1.76.0 | ||||
| @@ -16,13 +19,11 @@ require ( | ||||
|  | ||||
| require ( | ||||
| 	github.com/davecgh/go-spew v1.1.1 // indirect | ||||
| 	github.com/go-logr/logr v1.4.3 // indirect | ||||
| 	github.com/go-logr/stdr v1.2.2 // indirect | ||||
| 	github.com/google/uuid v1.6.0 // indirect | ||||
| 	github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect | ||||
| 	github.com/pmezard/go-difflib v1.0.0 // indirect | ||||
| 	go.opentelemetry.io/auto/sdk v1.2.1 // indirect | ||||
| 	go.opentelemetry.io/otel/metric v1.38.0 // indirect | ||||
| 	golang.org/x/net v0.45.0 // indirect | ||||
| 	golang.org/x/sys v0.37.0 // indirect | ||||
| 	golang.org/x/text v0.30.0 // indirect | ||||
|   | ||||
| @@ -0,0 +1,397 @@ | ||||
| // Copyright The OpenTelemetry Authors | ||||
| // SPDX-License-Identifier: Apache-2.0 | ||||
|  | ||||
| // Package observ provides experimental observability instrumentation for the | ||||
| // otlptracehttp exporter. | ||||
| package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/observ" | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"net/netip" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel" | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal" | ||||
| 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/x" | ||||
| 	"go.opentelemetry.io/otel/internal/global" | ||||
| 	"go.opentelemetry.io/otel/metric" | ||||
| 	semconv "go.opentelemetry.io/otel/semconv/v1.37.0" | ||||
| 	"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	// ScopeName is the unique name of the meter used for instrumentation. | ||||
| 	ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/observ" | ||||
|  | ||||
| 	// SchemaURL is the schema URL of the metrics produced by this | ||||
| 	// instrumentation. | ||||
| 	SchemaURL = semconv.SchemaURL | ||||
|  | ||||
| 	// Version is the current version of this instrumentation. | ||||
| 	// | ||||
| 	// This matches the version of the exporter. | ||||
| 	Version = internal.Version | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	measureAttrsPool = &sync.Pool{ | ||||
| 		New: func() any { | ||||
| 			const n = 1 + // component.name | ||||
| 				1 + // component.type | ||||
| 				1 + // server.addr | ||||
| 				1 + // server.port | ||||
| 				1 + // error.type | ||||
| 				1 // http.response.status_code | ||||
| 			s := make([]attribute.KeyValue, 0, n) | ||||
| 			// Return a pointer to a slice instead of a slice itself | ||||
| 			// to avoid allocations on every call. | ||||
| 			return &s | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	addOptPool = &sync.Pool{ | ||||
| 		New: func() any { | ||||
| 			const n = 1 // WithAttributeSet | ||||
| 			o := make([]metric.AddOption, 0, n) | ||||
| 			return &o | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	recordOptPool = &sync.Pool{ | ||||
| 		New: func() any { | ||||
| 			const n = 1 // WithAttributeSet | ||||
| 			o := make([]metric.RecordOption, 0, n) | ||||
| 			return &o | ||||
| 		}, | ||||
| 	} | ||||
| ) | ||||
|  | ||||
| func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) } | ||||
|  | ||||
| func put[T any](p *sync.Pool, s *[]T) { | ||||
| 	*s = (*s)[:0] // Reset. | ||||
| 	p.Put(s) | ||||
| } | ||||
|  | ||||
| // ComponentName returns the component name for the exporter with the | ||||
| // provided ID. | ||||
| func ComponentName(id int64) string { | ||||
| 	t := semconv.OTelComponentTypeOtlpHTTPSpanExporter.Value.AsString() | ||||
| 	return fmt.Sprintf("%s/%d", t, id) | ||||
| } | ||||
|  | ||||
| // Instrumentation is experimental instrumentation for the exporter. | ||||
| type Instrumentation struct { | ||||
| 	inflightSpans metric.Int64UpDownCounter | ||||
| 	exportedSpans metric.Int64Counter | ||||
| 	opDuration    metric.Float64Histogram | ||||
|  | ||||
| 	attrs  []attribute.KeyValue | ||||
| 	addOpt metric.AddOption | ||||
| 	recOpt metric.RecordOption | ||||
| } | ||||
|  | ||||
| // NewInstrumentation returns instrumentation for an OTLP over HTTP trace | ||||
| // exporter with the provided ID and endpoint. It uses the global | ||||
| // MeterProvider to create the instrumentation. | ||||
| // | ||||
| // The id should be the unique exporter instance ID. It is used | ||||
| // to set the "component.name" attribute. | ||||
| // | ||||
| // The endpoint is the HTTP endpoint the exporter is exporting to. | ||||
| // | ||||
| // If the experimental observability is disabled, nil is returned. | ||||
| func NewInstrumentation(id int64, endpoint string) (*Instrumentation, error) { | ||||
| 	if !x.Observability.Enabled() { | ||||
| 		return nil, nil | ||||
| 	} | ||||
|  | ||||
| 	attrs := BaseAttrs(id, endpoint) | ||||
| 	i := &Instrumentation{ | ||||
| 		attrs:  attrs, | ||||
| 		addOpt: metric.WithAttributeSet(attribute.NewSet(attrs...)), | ||||
|  | ||||
| 		// Do not modify attrs (NewSet sorts in-place), make a new slice. | ||||
| 		recOpt: metric.WithAttributeSet(attribute.NewSet(append( | ||||
| 			// Default to OK status code (200). | ||||
| 			[]attribute.KeyValue{semconv.HTTPResponseStatusCode(http.StatusOK)}, | ||||
| 			attrs..., | ||||
| 		)...)), | ||||
| 	} | ||||
|  | ||||
| 	mp := otel.GetMeterProvider() | ||||
| 	m := mp.Meter( | ||||
| 		ScopeName, | ||||
| 		metric.WithInstrumentationVersion(Version), | ||||
| 		metric.WithSchemaURL(SchemaURL), | ||||
| 	) | ||||
|  | ||||
| 	var err error | ||||
|  | ||||
| 	inflightSpans, e := otelconv.NewSDKExporterSpanInflight(m) | ||||
| 	if e != nil { | ||||
| 		e = fmt.Errorf("failed to create span inflight metric: %w", e) | ||||
| 		err = errors.Join(err, e) | ||||
| 	} | ||||
| 	i.inflightSpans = inflightSpans.Inst() | ||||
|  | ||||
| 	exportedSpans, e := otelconv.NewSDKExporterSpanExported(m) | ||||
| 	if e != nil { | ||||
| 		e = fmt.Errorf("failed to create span exported metric: %w", e) | ||||
| 		err = errors.Join(err, e) | ||||
| 	} | ||||
| 	i.exportedSpans = exportedSpans.Inst() | ||||
|  | ||||
| 	opDuration, e := otelconv.NewSDKExporterOperationDuration(m) | ||||
| 	if e != nil { | ||||
| 		e = fmt.Errorf("failed to create operation duration metric: %w", e) | ||||
| 		err = errors.Join(err, e) | ||||
| 	} | ||||
| 	i.opDuration = opDuration.Inst() | ||||
|  | ||||
| 	return i, err | ||||
| } | ||||
|  | ||||
| // BaseAttrs returns the base attributes for the exporter with the provided ID | ||||
| // and endpoint. | ||||
| // | ||||
| // The id should be the unique exporter instance ID. It is used | ||||
| // to set the "component.name" attribute. | ||||
| // | ||||
| // The endpoint is the HTTP endpoint the exporter is exporting to. It should be | ||||
| // in the format "host:port" or a full URL. | ||||
| func BaseAttrs(id int64, endpoint string) []attribute.KeyValue { | ||||
| 	host, port, err := parseEndpoint(endpoint) | ||||
| 	if err != nil || (host == "" && port < 0) { | ||||
| 		if err != nil { | ||||
| 			global.Debug("failed to parse endpoint", "endpoint", endpoint, "error", err) | ||||
| 		} | ||||
| 		return []attribute.KeyValue{ | ||||
| 			semconv.OTelComponentName(ComponentName(id)), | ||||
| 			semconv.OTelComponentTypeOtlpHTTPSpanExporter, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Do not use append so the slice is exactly allocated. | ||||
|  | ||||
| 	if port < 0 { | ||||
| 		return []attribute.KeyValue{ | ||||
| 			semconv.OTelComponentName(ComponentName(id)), | ||||
| 			semconv.OTelComponentTypeOtlpHTTPSpanExporter, | ||||
| 			semconv.ServerAddress(host), | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if host == "" { | ||||
| 		return []attribute.KeyValue{ | ||||
| 			semconv.OTelComponentName(ComponentName(id)), | ||||
| 			semconv.OTelComponentTypeOtlpHTTPSpanExporter, | ||||
| 			semconv.ServerPort(port), | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return []attribute.KeyValue{ | ||||
| 		semconv.OTelComponentName(ComponentName(id)), | ||||
| 		semconv.OTelComponentTypeOtlpHTTPSpanExporter, | ||||
| 		semconv.ServerAddress(host), | ||||
| 		semconv.ServerPort(port), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // parseEndpoint parses the host and port from endpoint that has the form | ||||
| // "host[:port]", or it returns an error if the endpoint is not parsable. | ||||
| // | ||||
| // If no port is specified, -1 is returned. | ||||
| // | ||||
| // If no host is specified, an empty string is returned. | ||||
| func parseEndpoint(endpoint string) (string, int, error) { | ||||
| 	// First check if the endpoint is just an IP address. | ||||
| 	if ip := parseIP(endpoint); ip != "" { | ||||
| 		return ip, -1, nil | ||||
| 	} | ||||
|  | ||||
| 	// If there's no colon, there is no port (IPv6 with no port checked above). | ||||
| 	if !strings.Contains(endpoint, ":") { | ||||
| 		return endpoint, -1, nil | ||||
| 	} | ||||
|  | ||||
| 	// Otherwise, parse as host:port. | ||||
| 	host, portStr, err := net.SplitHostPort(endpoint) | ||||
| 	if err != nil { | ||||
| 		return "", -1, fmt.Errorf("invalid host:port %q: %w", endpoint, err) | ||||
| 	} | ||||
|  | ||||
| 	const base, bitSize = 10, 16 | ||||
| 	port16, err := strconv.ParseUint(portStr, base, bitSize) | ||||
| 	if err != nil { | ||||
| 		return "", -1, fmt.Errorf("invalid port %q: %w", portStr, err) | ||||
| 	} | ||||
| 	port := int(port16) // port is guaranteed to be in the range [0, 65535]. | ||||
|  | ||||
| 	return host, port, nil | ||||
| } | ||||
|  | ||||
| // parseIP attempts to parse the entire endpoint as an IP address. | ||||
| // It returns the normalized string form of the IP if successful, | ||||
| // or an empty string if parsing fails. | ||||
| func parseIP(ip string) string { | ||||
| 	// Strip leading and trailing brackets for IPv6 addresses. | ||||
| 	if len(ip) >= 2 && ip[0] == '[' && ip[len(ip)-1] == ']' { | ||||
| 		ip = ip[1 : len(ip)-1] | ||||
| 	} | ||||
| 	addr, err := netip.ParseAddr(ip) | ||||
| 	if err != nil { | ||||
| 		return "" | ||||
| 	} | ||||
| 	// Return the normalized string form of the IP. | ||||
| 	return addr.String() | ||||
| } | ||||
|  | ||||
| // ExportSpans instruments the UploadTraces method of the client. It returns an | ||||
| // [ExportOp] that must have its [ExportOp.End] method called when the | ||||
| // operation end. | ||||
| func (i *Instrumentation) ExportSpans(ctx context.Context, nSpans int) ExportOp { | ||||
| 	start := time.Now() | ||||
|  | ||||
| 	addOpt := get[metric.AddOption](addOptPool) | ||||
| 	defer put(addOptPool, addOpt) | ||||
| 	*addOpt = append(*addOpt, i.addOpt) | ||||
| 	i.inflightSpans.Add(ctx, int64(nSpans), *addOpt...) | ||||
|  | ||||
| 	return ExportOp{ | ||||
| 		ctx:    ctx, | ||||
| 		start:  start, | ||||
| 		nSpans: int64(nSpans), | ||||
| 		inst:   i, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // ExportOp tracks the export operation being observed by | ||||
| // [Instrumentation.ExportSpans]. | ||||
| type ExportOp struct { | ||||
| 	ctx    context.Context | ||||
| 	start  time.Time | ||||
| 	nSpans int64 | ||||
|  | ||||
| 	inst *Instrumentation | ||||
| } | ||||
|  | ||||
| // End completes the observation of the operation being observed by a call to | ||||
| // [Instrumentation.ExportSpans]. | ||||
| // | ||||
| // Any error that is encountered is provided as err. | ||||
| // The HTTP status code from the response is provided as status. | ||||
| // | ||||
| // If err is not nil, all spans will be recorded as failures unless error is of | ||||
| // type [internal.PartialSuccess]. In the case of a PartialSuccess, the number | ||||
| // of successfully exported spans will be determined by inspecting the | ||||
| // RejectedItems field of the PartialSuccess. | ||||
| func (e ExportOp) End(err error, status int) { | ||||
| 	addOpt := get[metric.AddOption](addOptPool) | ||||
| 	defer put(addOptPool, addOpt) | ||||
| 	*addOpt = append(*addOpt, e.inst.addOpt) | ||||
|  | ||||
| 	e.inst.inflightSpans.Add(e.ctx, -e.nSpans, *addOpt...) | ||||
|  | ||||
| 	success := successful(e.nSpans, err) | ||||
| 	// Record successfully exported spans, even if the value is 0 which are | ||||
| 	// meaningful to distribution aggregations. | ||||
| 	e.inst.exportedSpans.Add(e.ctx, success, *addOpt...) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		attrs := get[attribute.KeyValue](measureAttrsPool) | ||||
| 		defer put(measureAttrsPool, attrs) | ||||
| 		*attrs = append(*attrs, e.inst.attrs...) | ||||
| 		*attrs = append(*attrs, semconv.ErrorType(err)) | ||||
|  | ||||
| 		// Do not inefficiently make a copy of attrs by using | ||||
| 		// WithAttributes instead of WithAttributeSet. | ||||
| 		o := metric.WithAttributeSet(attribute.NewSet(*attrs...)) | ||||
| 		// Reset addOpt with new attribute set. | ||||
| 		*addOpt = append((*addOpt)[:0], o) | ||||
|  | ||||
| 		e.inst.exportedSpans.Add(e.ctx, e.nSpans-success, *addOpt...) | ||||
| 	} | ||||
|  | ||||
| 	recOpt := get[metric.RecordOption](recordOptPool) | ||||
| 	defer put(recordOptPool, recOpt) | ||||
| 	*recOpt = append(*recOpt, e.inst.recordOption(err, status)) | ||||
|  | ||||
| 	d := time.Since(e.start).Seconds() | ||||
| 	e.inst.opDuration.Record(e.ctx, d, *recOpt...) | ||||
| } | ||||
|  | ||||
| // recordOption returns a RecordOption with attributes representing the | ||||
| // outcome of the operation being recorded. | ||||
| // | ||||
| // If err is nil and status is 200, the default recOpt of the | ||||
| // Instrumentation is returned. | ||||
| // | ||||
| // Otherwise, a new RecordOption is returned with the base attributes of the | ||||
| // Instrumentation plus the http.response.status_code attribute set to the | ||||
| // provided status, and if err is not nil, the error.type attribute set | ||||
| // to the type of the error. | ||||
| func (i *Instrumentation) recordOption(err error, status int) metric.RecordOption { | ||||
| 	if err == nil && status == http.StatusOK { | ||||
| 		return i.recOpt | ||||
| 	} | ||||
|  | ||||
| 	attrs := get[attribute.KeyValue](measureAttrsPool) | ||||
| 	defer put(measureAttrsPool, attrs) | ||||
| 	*attrs = append(*attrs, i.attrs...) | ||||
|  | ||||
| 	*attrs = append(*attrs, semconv.HTTPResponseStatusCode(status)) | ||||
| 	if err != nil { | ||||
| 		*attrs = append(*attrs, semconv.ErrorType(err)) | ||||
| 	} | ||||
|  | ||||
| 	// Do not inefficiently make a copy of attrs by using WithAttributes | ||||
| 	// instead of WithAttributeSet. | ||||
| 	return metric.WithAttributeSet(attribute.NewSet(*attrs...)) | ||||
| } | ||||
|  | ||||
| // successful returns the number of successfully exported spans out of the n | ||||
| // that were exported based on the provided error. | ||||
| // | ||||
| // If err is nil, n is returned. All spans were successfully exported. | ||||
| // | ||||
| // If err is not nil and not an [internal.PartialSuccess] error, 0 is returned. | ||||
| // It is assumed all spans failed to be exported. | ||||
| // | ||||
| // If err is an [internal.PartialSuccess] error, the number of successfully | ||||
| // exported spans is computed by subtracting the RejectedItems field from n. If | ||||
| // RejectedItems is negative, n is returned. If RejectedItems is greater than | ||||
| // n, 0 is returned. | ||||
| func successful(n int64, err error) int64 { | ||||
| 	if err == nil { | ||||
| 		return n // All spans successfully exported. | ||||
| 	} | ||||
| 	// Split rejection calculation so successful is inlinable. | ||||
| 	return n - rejected(n, err) | ||||
| } | ||||
|  | ||||
| var errPartialPool = &sync.Pool{ | ||||
| 	New: func() any { return new(internal.PartialSuccess) }, | ||||
| } | ||||
|  | ||||
| // rejected returns how many out of the n spans exporter were rejected based on | ||||
| // the provided non-nil err. | ||||
| func rejected(n int64, err error) int64 { | ||||
| 	ps := errPartialPool.Get().(*internal.PartialSuccess) | ||||
| 	defer errPartialPool.Put(ps) | ||||
| 	// Check for partial success. | ||||
| 	if errors.As(err, ps) { | ||||
| 		// Bound RejectedItems to [0, n]. This should not be needed, | ||||
| 		// but be defensive as this is from an external source. | ||||
| 		return min(max(ps.RejectedItems, 0), n) | ||||
| 	} | ||||
| 	return n // All spans rejected. | ||||
| } | ||||
| @@ -0,0 +1,386 @@ | ||||
| // Copyright The OpenTelemetry Authors | ||||
| // SPDX-License-Identifier: Apache-2.0 | ||||
|  | ||||
| package observ_test | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"net/http" | ||||
| 	"strconv" | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/go-logr/logr" | ||||
| 	"github.com/go-logr/logr/testr" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/require" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel" | ||||
| 	"go.opentelemetry.io/otel/attribute" | ||||
| 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal" | ||||
| 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/observ" | ||||
| 	"go.opentelemetry.io/otel/internal/global" | ||||
| 	mapi "go.opentelemetry.io/otel/metric" | ||||
| 	"go.opentelemetry.io/otel/sdk/instrumentation" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/metricdata" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" | ||||
| 	semconv "go.opentelemetry.io/otel/semconv/v1.37.0" | ||||
| 	"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	ID         = 0 | ||||
| 	ServerAddr = "localhost" | ||||
| 	ServerPort = 4318 | ||||
| ) | ||||
|  | ||||
| var Endpoint = ServerAddr + ":" + strconv.Itoa(ServerPort) | ||||
|  | ||||
| var Scope = instrumentation.Scope{ | ||||
| 	Name:      observ.ScopeName, | ||||
| 	Version:   observ.Version, | ||||
| 	SchemaURL: observ.SchemaURL, | ||||
| } | ||||
|  | ||||
| type errMeterProvider struct { | ||||
| 	mapi.MeterProvider | ||||
|  | ||||
| 	err error | ||||
| } | ||||
|  | ||||
| func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter { | ||||
| 	return &errMeter{err: m.err} | ||||
| } | ||||
|  | ||||
| type errMeter struct { | ||||
| 	mapi.Meter | ||||
|  | ||||
| 	err error | ||||
| } | ||||
|  | ||||
| func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) { | ||||
| 	return nil, m.err | ||||
| } | ||||
|  | ||||
| func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) { | ||||
| 	return nil, m.err | ||||
| } | ||||
|  | ||||
| func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) { | ||||
| 	return nil, m.err | ||||
| } | ||||
|  | ||||
| func TestNewInstrumentationObservabilityErrors(t *testing.T) { | ||||
| 	orig := otel.GetMeterProvider() | ||||
| 	t.Cleanup(func() { otel.SetMeterProvider(orig) }) | ||||
| 	mp := &errMeterProvider{err: assert.AnError} | ||||
| 	otel.SetMeterProvider(mp) | ||||
|  | ||||
| 	t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") | ||||
|  | ||||
| 	_, err := observ.NewInstrumentation(ID, Endpoint) | ||||
| 	require.ErrorIs(t, err, assert.AnError, "new instrument errors") | ||||
|  | ||||
| 	assert.ErrorContains(t, err, "inflight metric") | ||||
| 	assert.ErrorContains(t, err, "span exported metric") | ||||
| 	assert.ErrorContains(t, err, "operation duration metric") | ||||
| } | ||||
|  | ||||
| func TestNewInstrumentationObservabilityDisabled(t *testing.T) { | ||||
| 	// Do not set OTEL_GO_X_OBSERVABILITY. | ||||
| 	got, err := observ.NewInstrumentation(ID, Endpoint) | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.Nil(t, got) | ||||
| } | ||||
|  | ||||
| func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) { | ||||
| 	t.Helper() | ||||
|  | ||||
| 	t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") | ||||
|  | ||||
| 	original := otel.GetMeterProvider() | ||||
| 	t.Cleanup(func() { otel.SetMeterProvider(original) }) | ||||
|  | ||||
| 	r := metric.NewManualReader() | ||||
| 	mp := metric.NewMeterProvider(metric.WithReader(r)) | ||||
| 	otel.SetMeterProvider(mp) | ||||
|  | ||||
| 	inst, err := observ.NewInstrumentation(ID, Endpoint) | ||||
| 	require.NoError(t, err) | ||||
| 	require.NotNil(t, inst) | ||||
|  | ||||
| 	return inst, func() metricdata.ScopeMetrics { | ||||
| 		var rm metricdata.ResourceMetrics | ||||
| 		require.NoError(t, r.Collect(t.Context(), &rm)) | ||||
|  | ||||
| 		require.Len(t, rm.ScopeMetrics, 1) | ||||
| 		return rm.ScopeMetrics[0] | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func baseAttrs(err error) []attribute.KeyValue { | ||||
| 	attrs := []attribute.KeyValue{ | ||||
| 		semconv.OTelComponentName(observ.ComponentName(ID)), | ||||
| 		semconv.OTelComponentTypeOtlpHTTPSpanExporter, | ||||
| 		semconv.ServerAddress(ServerAddr), | ||||
| 		semconv.ServerPort(ServerPort), | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		attrs = append(attrs, semconv.ErrorType(err)) | ||||
| 	} | ||||
| 	return attrs | ||||
| } | ||||
|  | ||||
| func set(err error) attribute.Set { | ||||
| 	return attribute.NewSet(baseAttrs(err)...) | ||||
| } | ||||
|  | ||||
| func spanInflight() metricdata.Metrics { | ||||
| 	return metricdata.Metrics{ | ||||
| 		Name:        otelconv.SDKExporterSpanInflight{}.Name(), | ||||
| 		Description: otelconv.SDKExporterSpanInflight{}.Description(), | ||||
| 		Unit:        otelconv.SDKExporterSpanInflight{}.Unit(), | ||||
| 		Data: metricdata.Sum[int64]{ | ||||
| 			Temporality: metricdata.CumulativeTemporality, | ||||
| 			DataPoints: []metricdata.DataPoint[int64]{ | ||||
| 				{Attributes: set(nil), Value: 0}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func spanExported(success, total int64, err error) metricdata.Metrics { | ||||
| 	dp := []metricdata.DataPoint[int64]{ | ||||
| 		{Attributes: set(nil), Value: success}, | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		dp = append(dp, metricdata.DataPoint[int64]{ | ||||
| 			Attributes: set(err), | ||||
| 			Value:      total - success, | ||||
| 		}) | ||||
| 	} | ||||
| 	return metricdata.Metrics{ | ||||
| 		Name:        otelconv.SDKExporterSpanExported{}.Name(), | ||||
| 		Description: otelconv.SDKExporterSpanExported{}.Description(), | ||||
| 		Unit:        otelconv.SDKExporterSpanExported{}.Unit(), | ||||
| 		Data: metricdata.Sum[int64]{ | ||||
| 			Temporality: metricdata.CumulativeTemporality, | ||||
| 			IsMonotonic: true, | ||||
| 			DataPoints:  dp, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func operationDuration(err error, statusCode int) metricdata.Metrics { | ||||
| 	httpSet := func(err error, statusCode int) attribute.Set { | ||||
| 		attrs := baseAttrs(err) | ||||
| 		attrs = append(attrs, semconv.HTTPResponseStatusCode(statusCode)) | ||||
| 		return attribute.NewSet(attrs...) | ||||
| 	} | ||||
| 	return metricdata.Metrics{ | ||||
| 		Name:        otelconv.SDKExporterOperationDuration{}.Name(), | ||||
| 		Description: otelconv.SDKExporterOperationDuration{}.Description(), | ||||
| 		Unit:        otelconv.SDKExporterOperationDuration{}.Unit(), | ||||
| 		Data: metricdata.Histogram[float64]{ | ||||
| 			Temporality: metricdata.CumulativeTemporality, | ||||
| 			DataPoints: []metricdata.HistogramDataPoint[float64]{ | ||||
| 				{Attributes: httpSet(err, statusCode)}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func assertMetrics(t *testing.T, got metricdata.ScopeMetrics, spans, success int64, err error, statusCode int) { | ||||
| 	t.Helper() | ||||
|  | ||||
| 	assert.Equal(t, Scope, got.Scope, "unexpected scope") | ||||
|  | ||||
| 	m := got.Metrics | ||||
| 	require.Len(t, m, 3, "expected 3 metrics") | ||||
|  | ||||
| 	o := metricdatatest.IgnoreTimestamp() | ||||
| 	want := spanInflight() | ||||
| 	metricdatatest.AssertEqual(t, want, m[0], o) | ||||
|  | ||||
| 	want = spanExported(success, spans, err) | ||||
| 	metricdatatest.AssertEqual(t, want, m[1], o) | ||||
|  | ||||
| 	want = operationDuration(err, statusCode) | ||||
| 	metricdatatest.AssertEqual(t, want, m[2], o, metricdatatest.IgnoreValue()) | ||||
| } | ||||
|  | ||||
| func TestInstrumentationExportSpans(t *testing.T) { | ||||
| 	inst, collect := setup(t) | ||||
|  | ||||
| 	const n = 10 | ||||
| 	inst.ExportSpans(t.Context(), n).End(nil, http.StatusOK) | ||||
|  | ||||
| 	assertMetrics(t, collect(), n, n, nil, http.StatusOK) | ||||
| } | ||||
|  | ||||
| func TestInstrumentationExportSpansAllErrored(t *testing.T) { | ||||
| 	inst, collect := setup(t) | ||||
|  | ||||
| 	const n = 10 | ||||
| 	err := errors.New("http error") | ||||
| 	inst.ExportSpans(t.Context(), n).End(err, http.StatusInternalServerError) | ||||
|  | ||||
| 	const success = 0 | ||||
| 	assertMetrics(t, collect(), n, success, err, http.StatusInternalServerError) | ||||
| } | ||||
|  | ||||
| func TestInstrumentationExportSpansPartialErrored(t *testing.T) { | ||||
| 	inst, collect := setup(t) | ||||
|  | ||||
| 	const n = 10 | ||||
| 	const success = n - 5 | ||||
|  | ||||
| 	err := errors.New("partial failure") | ||||
| 	err = errors.Join(err, &internal.PartialSuccess{RejectedItems: 5}) | ||||
| 	inst.ExportSpans(t.Context(), n).End(err, http.StatusOK) | ||||
|  | ||||
| 	assertMetrics(t, collect(), n, success, err, http.StatusOK) | ||||
| } | ||||
|  | ||||
| func TestInstrumentationExportSpansInvalidPartialErrored(t *testing.T) { | ||||
| 	inst, collect := setup(t) | ||||
|  | ||||
| 	const n = 10 | ||||
| 	pErr := &internal.PartialSuccess{RejectedItems: -5} | ||||
| 	err := errors.Join(errors.New("temporary"), pErr) | ||||
| 	inst.ExportSpans(t.Context(), n).End(err, http.StatusServiceUnavailable) | ||||
|  | ||||
| 	// Round -5 to 0. | ||||
| 	success := int64(n) // (n - 0) | ||||
| 	assertMetrics(t, collect(), n, success, err, http.StatusServiceUnavailable) | ||||
|  | ||||
| 	// Note: the metrics are cumulative, so account for the previous | ||||
| 	// ExportSpans call. | ||||
| 	pErr.RejectedItems = n + 5 | ||||
| 	inst.ExportSpans(t.Context(), n).End(err, http.StatusServiceUnavailable) | ||||
|  | ||||
| 	// Round n+5 to n. | ||||
| 	success += 0 // success + (n - n) | ||||
| 	assertMetrics(t, collect(), n+n, success, err, http.StatusServiceUnavailable) | ||||
| } | ||||
|  | ||||
| func TestBaseAttrs(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		endpoint string | ||||
| 		host     string | ||||
| 		port     int | ||||
| 	}{ | ||||
| 		// Empty. | ||||
| 		{endpoint: "", host: "", port: -1}, | ||||
|  | ||||
| 		// Only a port. | ||||
| 		{endpoint: ":4318", host: "", port: 4318}, | ||||
|  | ||||
| 		// Hostname. | ||||
| 		{endpoint: "localhost:4318", host: "localhost", port: 4318}, | ||||
| 		{endpoint: "localhost", host: "localhost", port: -1}, | ||||
|  | ||||
| 		// IPv4 address. | ||||
| 		{endpoint: "127.0.0.1:4318", host: "127.0.0.1", port: 4318}, | ||||
| 		{endpoint: "127.0.0.1", host: "127.0.0.1", port: -1}, | ||||
|  | ||||
| 		// IPv6 address. | ||||
| 		{endpoint: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, | ||||
| 		{endpoint: "2001:db8:85a3:0:0:8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, | ||||
| 		{endpoint: "2001:db8:85a3::8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, | ||||
| 		{endpoint: "[2001:db8:85a3::8a2e:370:7334]", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, | ||||
| 		{endpoint: "[::1]:9090", host: "::1", port: 9090}, | ||||
|  | ||||
| 		// Port edge cases. | ||||
| 		{endpoint: "example.com:0", host: "example.com", port: 0}, | ||||
| 		{endpoint: "example.com:65535", host: "example.com", port: 65535}, | ||||
|  | ||||
| 		// Case insensitive. | ||||
| 		{endpoint: "ExAmPlE.COM:8080", host: "ExAmPlE.COM", port: 8080}, | ||||
| 	} | ||||
| 	for _, tt := range tests { | ||||
| 		got := observ.BaseAttrs(ID, tt.endpoint) | ||||
| 		want := []attribute.KeyValue{ | ||||
| 			semconv.OTelComponentName(observ.ComponentName(ID)), | ||||
| 			semconv.OTelComponentTypeOtlpHTTPSpanExporter, | ||||
| 		} | ||||
|  | ||||
| 		if tt.host != "" { | ||||
| 			want = append(want, semconv.ServerAddress(tt.host)) | ||||
| 		} | ||||
| 		if tt.port != -1 { | ||||
| 			want = append(want, semconv.ServerPort(tt.port)) | ||||
| 		} | ||||
| 		assert.Equal(t, want, got) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type logSink struct { | ||||
| 	logr.LogSink | ||||
|  | ||||
| 	level         int | ||||
| 	msg           string | ||||
| 	keysAndValues []any | ||||
| } | ||||
|  | ||||
| func (*logSink) Enabled(int) bool { return true } | ||||
|  | ||||
| func (l *logSink) Info(level int, msg string, keysAndValues ...any) { | ||||
| 	l.level, l.msg, l.keysAndValues = level, msg, keysAndValues | ||||
| 	l.LogSink.Info(level, msg, keysAndValues...) | ||||
| } | ||||
|  | ||||
| func TestBaseAttrsError(t *testing.T) { | ||||
| 	endpoints := []string{ | ||||
| 		"example.com:invalid",   // Non-numeric port. | ||||
| 		"example.com:8080:9090", // Multiple colons in port. | ||||
| 		"example.com:99999",     // Port out of range. | ||||
| 		"example.com:-1",        // Port out of range. | ||||
| 	} | ||||
| 	for _, endpoint := range endpoints { | ||||
| 		l := &logSink{LogSink: testr.New(t).GetSink()} | ||||
| 		t.Cleanup(func(orig logr.Logger) func() { | ||||
| 			global.SetLogger(logr.New(l)) | ||||
| 			return func() { global.SetLogger(orig) } | ||||
| 		}(global.GetLogger())) | ||||
|  | ||||
| 		// Set the logger as global so BaseAttrs can log the error. | ||||
| 		got := observ.BaseAttrs(ID, endpoint) | ||||
| 		want := []attribute.KeyValue{ | ||||
| 			semconv.OTelComponentName(observ.ComponentName(ID)), | ||||
| 			semconv.OTelComponentTypeOtlpHTTPSpanExporter, | ||||
| 		} | ||||
| 		assert.Equal(t, want, got) | ||||
|  | ||||
| 		assert.Equal(t, 8, l.level, "expected Debug log level") | ||||
| 		assert.Equal(t, "failed to parse endpoint", l.msg) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkInstrumentationExportSpans(b *testing.B) { | ||||
| 	setup := func(b *testing.B) *observ.Instrumentation { | ||||
| 		b.Helper() | ||||
| 		b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") | ||||
| 		inst, err := observ.NewInstrumentation(ID, Endpoint) | ||||
| 		if err != nil { | ||||
| 			b.Fatalf("failed to create instrumentation: %v", err) | ||||
| 		} | ||||
| 		return inst | ||||
| 	} | ||||
|  | ||||
| 	run := func(err error, statusCode int) func(*testing.B) { | ||||
| 		return func(b *testing.B) { | ||||
| 			inst := setup(b) | ||||
| 			b.ReportAllocs() | ||||
| 			b.ResetTimer() | ||||
| 			for b.Loop() { | ||||
| 				inst.ExportSpans(b.Context(), 10).End(err, statusCode) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	b.Run("NoError", run(nil, http.StatusOK)) | ||||
| 	err := &internal.PartialSuccess{RejectedItems: 6} | ||||
| 	b.Run("PartialError", run(err, http.StatusOK)) | ||||
| 	b.Run("FullError", run(assert.AnError, http.StatusInternalServerError)) | ||||
| } | ||||
		Reference in New Issue
	
	Block a user