diff --git a/CHANGELOG.md b/CHANGELOG.md index d38a27d04..5c5ef410e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486) - Add experimental observability metrics for simple span processor in `go.opentelemetry.io/otel/sdk/trace`. (#7374) - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#7512) +- Add experimental observability metrics for manual reader in `go.opentelemetry.io/otel/sdk/metric`. (#7524) ### Fixed diff --git a/sdk/metric/internal/observ/instrumentation.go b/sdk/metric/internal/observ/instrumentation.go new file mode 100644 index 000000000..41cfc6bcb --- /dev/null +++ b/sdk/metric/internal/observ/instrumentation.go @@ -0,0 +1,168 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package observ provides experimental observability instrumentation for the +// metric reader. +package observ // import "go.opentelemetry.io/otel/sdk/metric/internal/observ" + +import ( + "context" + "fmt" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/internal/x" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ( + // ScopeName is the unique name of the meter used for instrumentation. + ScopeName = "go.opentelemetry.io/otel/sdk/metric/internal/observ" + + // SchemaURL is the schema URL of the metrics produced by this + // instrumentation. + SchemaURL = semconv.SchemaURL +) + +var ( + measureAttrsPool = &sync.Pool{ + New: func() any { + const n = 1 + // component.name + 1 + // component.type + 1 // error.type + s := make([]attribute.KeyValue, 0, n) + // Return a pointer to a slice instead of a slice itself + // to avoid allocations on every call. + return &s + }, + } + + recordOptPool = &sync.Pool{ + New: func() any { + const n = 1 // WithAttributeSet + o := make([]metric.RecordOption, 0, n) + return &o + }, + } +) + +func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) } + +func put[T any](p *sync.Pool, s *[]T) { + *s = (*s)[:0] // Reset. + p.Put(s) +} + +// ComponentName returns the component name for the metric reader with the +// provided ComponentType and ID. +func ComponentName(componentType string, id int64) string { + return fmt.Sprintf("%s/%d", componentType, id) +} + +// Instrumentation is experimental instrumentation for the metric reader. +type Instrumentation struct { + colDuration metric.Float64Histogram + + attrs []attribute.KeyValue + recOpt metric.RecordOption +} + +// NewInstrumentation returns instrumentation for metric reader with the provided component +// type (such as periodic and manual metric reader) and ID. It uses the global +// MeterProvider to create the instrumentation. +// +// The id should be the unique metric reader instance ID. It is used +// to set the "component.name" attribute. +// +// If the experimental observability is disabled, nil is returned. +func NewInstrumentation(componentType string, id int64) (*Instrumentation, error) { + if !x.Observability.Enabled() { + return nil, nil + } + + i := &Instrumentation{ + attrs: []attribute.KeyValue{ + semconv.OTelComponentName(ComponentName(componentType, id)), + semconv.OTelComponentTypeKey.String(componentType), + }, + } + + r := attribute.NewSet(i.attrs...) + i.recOpt = metric.WithAttributeSet(r) + + meter := otel.GetMeterProvider().Meter( + ScopeName, + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(SchemaURL), + ) + + colDuration, err := otelconv.NewSDKMetricReaderCollectionDuration(meter) + if err != nil { + err = fmt.Errorf("failed to create collection duration metric: %w", err) + } + i.colDuration = colDuration.Inst() + + return i, err +} + +// CollectMetrics instruments the collect method of metric reader. It returns an +// [CollectOp] that must have its [CollectOp.End] method called when the +// collection end. +func (i *Instrumentation) CollectMetrics(ctx context.Context) CollectOp { + start := time.Now() + + return CollectOp{ + ctx: ctx, + start: start, + inst: i, + } +} + +// CollectOp tracks the collect operation being observed by +// [Instrumentation.CollectMetrics]. +type CollectOp struct { + ctx context.Context + start time.Time + + inst *Instrumentation +} + +// End completes the observation of the operation being observed by a call to +// [Instrumentation.CollectMetrics]. +// +// Any error that is encountered is provided as err. +func (e CollectOp) End(err error) { + recOpt := get[metric.RecordOption](recordOptPool) + defer put(recordOptPool, recOpt) + *recOpt = append(*recOpt, e.inst.recordOption(err)) + + d := time.Since(e.start).Seconds() + e.inst.colDuration.Record(e.ctx, d, *recOpt...) +} + +// recordOption returns a RecordOption with attributes representing the +// outcome of the collection being recorded. +// +// If err is nil, the default recOpt of the Instrumentation is returned. +// +// Otherwise, a new RecordOption is returned with the base attributes of the +// Instrumentation plus the error.type attribute set to the type of the error. +func (i *Instrumentation) recordOption(err error) metric.RecordOption { + if err == nil { + return i.recOpt + } + + attrs := get[attribute.KeyValue](measureAttrsPool) + defer put(measureAttrsPool, attrs) + *attrs = append(*attrs, i.attrs...) + *attrs = append(*attrs, semconv.ErrorType(err)) + + // Do not inefficiently make a copy of attrs by using WithAttributes + // instead of WithAttributeSet. + return metric.WithAttributeSet(attribute.NewSet(*attrs...)) +} diff --git a/sdk/metric/internal/observ/instrumentation_test.go b/sdk/metric/internal/observ/instrumentation_test.go new file mode 100644 index 000000000..5bbf1f312 --- /dev/null +++ b/sdk/metric/internal/observ/instrumentation_test.go @@ -0,0 +1,217 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + mapi "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/internal/observ" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ( + ID = int64(42) + ComponentType = "test-reader" +) + +var Scope = instrumentation.Scope{ + Name: observ.ScopeName, + Version: sdk.Version(), + SchemaURL: observ.SchemaURL, +} + +type errMeterProvider struct { + mapi.MeterProvider + err error +} + +func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter { + return &errMeter{err: m.err} +} + +type errMeter struct { + mapi.Meter + err error +} + +func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) { + return nil, m.err +} + +func TestNewInstrumentationObservabilityErrors(t *testing.T) { + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + mp := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(mp) + + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + _, err := observ.NewInstrumentation(ComponentType, ID) + require.ErrorIs(t, err, assert.AnError, "new instrument errors should be joined") + + assert.ErrorContains(t, err, "collection duration metric") +} + +func TestNewInstrumentationObservabilityDisabled(t *testing.T) { + // Do not set OTEL_GO_X_OBSERVABILITY. + got, err := observ.NewInstrumentation(ComponentType, ID) + assert.NoError(t, err) + assert.Nil(t, got) +} + +// setup installs a ManualReader MeterProvider and returns an instantiated +// Instrumentation plus a collector that returns the single ScopeMetrics group. +func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) { + t.Helper() + + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + original := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(original) }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + inst, err := observ.NewInstrumentation(ComponentType, ID) + require.NoError(t, err) + require.NotNil(t, inst) + + return inst, func() metricdata.ScopeMetrics { + var rm metricdata.ResourceMetrics + require.NoError(t, r.Collect(t.Context(), &rm)) + require.Len(t, rm.ScopeMetrics, 1) + return rm.ScopeMetrics[0] + } +} + +func baseAttrs(err error) []attribute.KeyValue { + attrs := []attribute.KeyValue{ + semconv.OTelComponentName(observ.ComponentName(ComponentType, ID)), + semconv.OTelComponentTypeKey.String(ComponentType), + } + if err != nil { + attrs = append(attrs, semconv.ErrorType(err)) + } + return attrs +} + +func set(err error) attribute.Set { + return attribute.NewSet(baseAttrs(err)...) +} + +func collectionDuration(err error) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKMetricReaderCollectionDuration{}.Name(), + Description: otelconv.SDKMetricReaderCollectionDuration{}.Description(), + Unit: otelconv.SDKMetricReaderCollectionDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: set(err)}, + }, + }, + } +} + +func assertCollectionMetrics(t *testing.T, got metricdata.ScopeMetrics, err error) { + t.Helper() + + assert.Equal(t, Scope, got.Scope, "unexpected scope") + + m := got.Metrics + require.Len(t, m, 1, "expected 1 metric (collection duration)") + + want := collectionDuration(err) + metricdatatest.AssertEqual(t, want, m[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) +} + +func TestInstrumentationCollectMetricsSuccess(t *testing.T) { + inst, collect := setup(t) + + inst.CollectMetrics(t.Context()).End(nil) + + assertCollectionMetrics(t, collect(), nil) +} + +func TestInstrumentationCollectMetricsError(t *testing.T) { + inst, collect := setup(t) + + wantErr := assert.AnError + inst.CollectMetrics(t.Context()).End(wantErr) + + assertCollectionMetrics(t, collect(), wantErr) +} + +func TestComponentName(t *testing.T) { + tests := []struct { + componentType string + id int64 + want string + }{ + {componentType: "periodic_metric_reader", id: 0, want: "periodic_metric_reader/0"}, + {componentType: "periodic_metric_reader", id: 1, want: "periodic_metric_reader/1"}, + {componentType: "periodic_metric_reader", id: 42, want: "periodic_metric_reader/42"}, + {componentType: "periodic_metric_reader", id: -1, want: "periodic_metric_reader/-1"}, + {componentType: "manual_metric_reader", id: 0, want: "manual_metric_reader/0"}, + } + + for _, tt := range tests { + got := observ.ComponentName(tt.componentType, tt.id) + assert.Equal(t, tt.want, got) + } +} + +func setupBench(b *testing.B) *observ.Instrumentation { + b.Helper() + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // Set up a proper MeterProvider for benchmarks + original := otel.GetMeterProvider() + b.Cleanup(func() { otel.SetMeterProvider(original) }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + inst, err := observ.NewInstrumentation(ComponentType, ID) + if err != nil { + b.Fatalf("failed to create instrumentation: %v", err) + } + if inst == nil { + b.Fatal("expected instrumentation, got nil") + } + return inst +} + +func BenchmarkInstrumentationCollectMetrics(b *testing.B) { + run := func(err error) func(*testing.B) { + inst := setupBench(b) + return func(b *testing.B) { + ctx := b.Context() + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + inst.CollectMetrics(ctx).End(err) + } + } + } + + err := errors.New("benchmark error") + b.Run("NoError", run(nil)) + b.Run("Error", run(err)) +} diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 85d3dc207..5b0630207 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -10,10 +10,18 @@ import ( "sync" "sync/atomic" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/metric/internal/observ" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +const ( + // ManualReaderType uniquely identifies the OpenTelemetry Metric Reader component + // being instrumented. + manualReaderType = "go.opentelemetry.io/otel/sdk/metric/metric.ManualReader" +) + // ManualReader is a simple Reader that allows an application to // read metrics on demand. type ManualReader struct { @@ -26,6 +34,8 @@ type ManualReader struct { temporalitySelector TemporalitySelector aggregationSelector AggregationSelector + + inst *observ.Instrumentation } // Compile time check the manualReader implements Reader and is comparable. @@ -39,9 +49,24 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader { aggregationSelector: cfg.aggregationSelector, } r.externalProducers.Store(cfg.producers) + + var err error + r.inst, err = observ.NewInstrumentation(manualReaderType, nextManualReaderID()) + if err != nil { + otel.Handle(err) + } + return r } +var manualReaderIDCounter atomic.Int64 + +// nextManualReaderID returns an identifier for this manual reader, +// starting with 0 and incrementing by 1 each time it is called. +func nextManualReaderID() int64 { + return manualReaderIDCounter.Add(1) - 1 +} + // register stores the sdkProducer which enables the caller // to read metrics from the SDK on demand. func (mr *ManualReader) register(p sdkProducer) { @@ -93,12 +118,20 @@ func (mr *ManualReader) Shutdown(context.Context) error { // // This method is safe to call concurrently. func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error { + var err error + if mr.inst != nil { + cp := mr.inst.CollectMetrics(ctx) + defer func() { cp.End(err) }() + } + if rm == nil { - return errors.New("manual reader: *metricdata.ResourceMetrics is nil") + err = errors.New("manual reader: *metricdata.ResourceMetrics is nil") + return err } p := mr.sdkProducer.Load() if p == nil { - return ErrReaderNotRegistered + err = ErrReaderNotRegistered + return err } ph, ok := p.(produceHolder) @@ -107,11 +140,11 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr // this should never happen. In the unforeseen case that this does // happen, return an error instead of panicking so a users code does // not halt in the processes. - err := fmt.Errorf("manual reader: invalid producer: %T", p) + err = fmt.Errorf("manual reader: invalid producer: %T", p) return err } - err := ph.produce(ctx, rm) + err = ph.produce(ctx, rm) if err != nil { return err } diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index f245f45c3..ddca50ad8 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -5,14 +5,24 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "fmt" + "strings" "testing" "time" + "github.com/go-logr/logr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) func TestManualReader(t *testing.T) { @@ -112,3 +122,282 @@ func TestManualReaderCollect(t *testing.T) { }) } } + +func TestManualReaderInstrumentation(t *testing.T) { + // Enable SDK observability. + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // ManualReader under test with a fake producer. + manualReader := NewManualReader() + t.Cleanup(func() { _ = manualReader.Shutdown(t.Context()) }) + manualReader.register(testSDKProducer{}) + + // Exercise a collect (producer data). + var got metricdata.ResourceMetrics + require.NoError(t, manualReader.Collect(t.Context(), &got)) + + // Collect again so we have something to scan through. + var rm metricdata.ResourceMetrics + require.NoError(t, manualReader.Collect(t.Context(), &rm)) + require.NotEmpty(t, rm.ScopeMetrics) + + targetName := otelconv.SDKMetricReaderCollectionDuration{}.Name() + targetDesc := otelconv.SDKMetricReaderCollectionDuration{}.Description() + targetUnit := otelconv.SDKMetricReaderCollectionDuration{}.Unit() + + // Find the SDK reader self-metric anywhere in the collected data. + foundMetric := findMetricByName(&rm, targetName) + + // If not found, explain and skip (this metric is emitted via the *global* MP). + if foundMetric == nil { + t.Skipf("SDK reader self-metric %q not found. It is emitted via the global MeterProvider; "+ + "this test does not install a global MP.", targetName) + return + } + + // Basic identity checks. + assert.Equal(t, targetName, foundMetric.Name) + assert.Equal(t, targetDesc, foundMetric.Description) + assert.Equal(t, targetUnit, foundMetric.Unit) + + // Must be a histogram with cumulative temporality. + hist, ok := foundMetric.Data.(metricdata.Histogram[float64]) + require.True(t, ok, "expected histogram data") + assert.Equal(t, metricdata.CumulativeTemporality, hist.Temporality) + require.NotEmpty(t, hist.DataPoints) + + // Check base attributes on one datapoint (flexibly). + dp := hist.DataPoints[0] + attrs := dp.Attributes.ToSlice() + t.Logf("observability attrs: %v", attrs) + + const expectedComponentType = "go.opentelemetry.io/otel/sdk/metric/metric.ManualReader" + + var hasName, hasType bool + for _, a := range attrs { + switch a.Key { + case "otel.component.name": + if s := a.Value.AsString(); s != "" && strings.Contains(s, "metric_reader") { + hasName = true + } + case "otel.component.type": + if a.Value.AsString() == expectedComponentType { + hasType = true + } + } + } + assert.True(t, hasName, "expected non-empty otel.component.name containing 'metric_reader'") + assert.True(t, hasType, "expected otel.component.type == %q", expectedComponentType) +} + +// findMetricByName searches all scopes/metrics for the given metric name. +func findMetricByName(rm *metricdata.ResourceMetrics, name string) *metricdata.Metrics { + for si := range rm.ScopeMetrics { + sm := &rm.ScopeMetrics[si] + for mi := range sm.Metrics { + if sm.Metrics[mi].Name == name { + return &sm.Metrics[mi] + } + } + } + return nil +} + +// createMetricDataTestProducerForManual creates a producer using patterns from metricdatatest for manual reader benchmarks. +func createMetricDataTestProducerForManual() testSDKProducer { + return testSDKProducer{ + produceFunc: func(_ context.Context, rm *metricdata.ResourceMetrics) error { + start := time.Now().Add(-time.Minute) + end := time.Now() + + // Create attribute sets using common patterns + aliceAttrs := attribute.NewSet(attribute.String("user", "alice"), attribute.String("env", "prod")) + bobAttrs := attribute.NewSet(attribute.String("user", "bob"), attribute.String("env", "staging")) + charlieAttrs := attribute.NewSet(attribute.String("user", "charlie"), attribute.String("env", "dev")) + + // Create exemplars for histogram metrics + exemplars := []metricdata.Exemplar[float64]{ + { + FilteredAttributes: []attribute.KeyValue{attribute.String("trace", "span-1")}, + Time: end, + Value: 15.5, + SpanID: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + TraceID: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + } + + // Define different metric types using metricdatatest patterns + createScopeMetrics := func(scopeIdx int) metricdata.ScopeMetrics { + metrics := []metricdata.Metrics{ + // Counter metrics + { + Name: fmt.Sprintf("requests_total_%d", scopeIdx), + Description: "Total number of requests", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: aliceAttrs, StartTime: start, Time: end, Value: 100 + int64(scopeIdx*10)}, + {Attributes: bobAttrs, StartTime: start, Time: end, Value: 150 + int64(scopeIdx*15)}, + {Attributes: charlieAttrs, StartTime: start, Time: end, Value: 75 + int64(scopeIdx*5)}, + }, + }, + }, + // Gauge metrics + { + Name: fmt.Sprintf("memory_usage_%d", scopeIdx), + Description: "Memory usage in MB", + Unit: "MB", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + {Attributes: aliceAttrs, Time: end, Value: 512.5 + float64(scopeIdx*10)}, + {Attributes: bobAttrs, Time: end, Value: 768.2 + float64(scopeIdx*20)}, + {Attributes: charlieAttrs, Time: end, Value: 256.8 + float64(scopeIdx*5)}, + }, + }, + }, + // Histogram metrics + { + Name: fmt.Sprintf("request_duration_%d", scopeIdx), + Description: "Request duration histogram", + Unit: "ms", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: aliceAttrs, + StartTime: start, + Time: end, + Count: 100, + Sum: 1500.5, + Min: metricdata.NewExtrema(1.0), + Max: metricdata.NewExtrema(50.0), + Bounds: []float64{1, 5, 10, 25, 50, 100, 250, 500}, + BucketCounts: []uint64{10, 20, 30, 25, 10, 4, 1, 0, 0}, + Exemplars: exemplars, + }, + { + Attributes: bobAttrs, + StartTime: start, + Time: end, + Count: 80, + Sum: 1200.3, + Min: metricdata.NewExtrema(2.0), + Max: metricdata.NewExtrema(45.0), + Bounds: []float64{1, 5, 10, 25, 50, 100, 250, 500}, + BucketCounts: []uint64{5, 15, 25, 20, 10, 4, 1, 0, 0}, + Exemplars: exemplars, + }, + }, + }, + }, + // Exponential Histogram + { + Name: fmt.Sprintf("response_size_%d", scopeIdx), + Description: "Response size exponential histogram", + Unit: "bytes", + Data: metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{ + { + Attributes: aliceAttrs, + StartTime: start, + Time: end, + Count: 50, + Sum: 25000.0, + Min: metricdata.NewExtrema(100.0), + Max: metricdata.NewExtrema(2000.0), + Scale: 2, + ZeroCount: 2, + Exemplars: exemplars, + }, + }, + }, + }, + } + + return metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: fmt.Sprintf("benchmark/scope/%d", scopeIdx), + Version: "1.0.0", + }, + Metrics: metrics, + } + } + + // Create multiple scopes for comprehensive test data + var scopeMetrics []metricdata.ScopeMetrics + for i := 0; i < 20; i++ { // 20 scopes with 4 metrics each = 80 total metrics + scopeMetrics = append(scopeMetrics, createScopeMetrics(i)) + } + + *rm = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("service.name", "benchmark-test")), + ScopeMetrics: scopeMetrics, + } + return nil + }, + } +} + +func BenchmarkManualReaderInstrumentation(b *testing.B) { + run := func(b *testing.B, withInstrumentationMP bool) { + // Save and restore the original global meter provider + orig := otel.GetMeterProvider() + defer otel.SetMeterProvider(orig) + + // Suppress internal logging messages for cleaner benchmark output + global.SetLogger(logr.Discard()) + b.Cleanup(func() { + // Logger will be reset by test cleanup naturally + }) + + // Suppress error handler messages for cleaner benchmark output + origErrorHandler := otel.GetErrorHandler() + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(error) {})) + b.Cleanup(func() { + otel.SetErrorHandler(origErrorHandler) + }) + + if withInstrumentationMP { + // Set up a meter provider for instrumentation to use + instrumentationReader := NewManualReader() + instrumentationMP := NewMeterProvider(WithReader(instrumentationReader)) + otel.SetMeterProvider(instrumentationMP) + + // Clean up the instrumentation meter provider + b.Cleanup(func() { + _ = instrumentationMP.Shutdown(b.Context()) + }) + } + + r := NewManualReader() + // Register with producer using metricdatatest patterns for realistic benchmark data + r.register(createMetricDataTestProducerForManual()) + b.Cleanup(func() { + _ = r.Shutdown(b.Context()) // Ignore error in cleanup + }) + + rm := &metricdata.ResourceMetrics{} + + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + // Test the collect operation (simulating what manual readers do) + err := r.Collect(b.Context(), rm) + _ = err // Ignore error for benchmark + } + } + + b.Run("NoObservability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "false") + run(b, false) + }) + + b.Run("Observability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + run(b, true) + }) +}