You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
Instrument manual reader from sdk/metric (#7524)
Resolve: #7009 ### Benchmarks ``` ➜ benchstat /tmp/manual_bench_disabled.txt /tmp/manual_bench_enabled.txt goos: darwin goarch: arm64 pkg: go.opentelemetry.io/otel/sdk/metric cpu: Apple M1 Max │ /tmp/manual_bench_disabled.txt │ /tmp/manual_bench_enabled.txt │ │ sec/op │ sec/op vs base │ ManualReaderInstrumentation/NoObservability-10 35.98µ ± 14% 36.29µ ± 11% ~ (p=0.665 n=40) ManualReaderInstrumentation/Observability-10 26.98µ ± 9% 28.23µ ± 3% ~ (p=0.207 n=40) geomean 31.16µ 32.01µ +2.73% │ /tmp/manual_bench_disabled.txt │ /tmp/manual_bench_enabled.txt │ │ B/op │ B/op vs base │ ManualReaderInstrumentation/NoObservability-10 49.24Ki ± 1% 49.24Ki ± 1% ~ (p=1.000 n=40) ManualReaderInstrumentation/Observability-10 51.78Ki ± 0% 51.78Ki ± 0% ~ (p=0.485 n=40) geomean 50.50Ki 50.50Ki +0.00% │ /tmp/manual_bench_disabled.txt │ /tmp/manual_bench_enabled.txt │ │ allocs/op │ allocs/op vs base │ ManualReaderInstrumentation/NoObservability-10 381.0 ± 1% 381.0 ± 1% ~ (p=1.000 n=40) ManualReaderInstrumentation/Observability-10 387.0 ± 0% 387.0 ± 1% ~ (p=0.485 n=40) geomean 384.0 384.0 +0.00% ``` --------- Co-authored-by: Damien Mathieu <42@dmathieu.com> Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
@@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486)
|
||||
- Add experimental observability metrics for simple span processor in `go.opentelemetry.io/otel/sdk/trace`. (#7374)
|
||||
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#7512)
|
||||
- Add experimental observability metrics for manual reader in `go.opentelemetry.io/otel/sdk/metric`. (#7524)
|
||||
|
||||
### Fixed
|
||||
|
||||
|
||||
@@ -0,0 +1,168 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package observ provides experimental observability instrumentation for the
|
||||
// metric reader.
|
||||
package observ // import "go.opentelemetry.io/otel/sdk/metric/internal/observ"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk"
|
||||
"go.opentelemetry.io/otel/sdk/internal/x"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
const (
|
||||
// ScopeName is the unique name of the meter used for instrumentation.
|
||||
ScopeName = "go.opentelemetry.io/otel/sdk/metric/internal/observ"
|
||||
|
||||
// SchemaURL is the schema URL of the metrics produced by this
|
||||
// instrumentation.
|
||||
SchemaURL = semconv.SchemaURL
|
||||
)
|
||||
|
||||
var (
|
||||
measureAttrsPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 + // component.name
|
||||
1 + // component.type
|
||||
1 // error.type
|
||||
s := make([]attribute.KeyValue, 0, n)
|
||||
// Return a pointer to a slice instead of a slice itself
|
||||
// to avoid allocations on every call.
|
||||
return &s
|
||||
},
|
||||
}
|
||||
|
||||
recordOptPool = &sync.Pool{
|
||||
New: func() any {
|
||||
const n = 1 // WithAttributeSet
|
||||
o := make([]metric.RecordOption, 0, n)
|
||||
return &o
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) }
|
||||
|
||||
func put[T any](p *sync.Pool, s *[]T) {
|
||||
*s = (*s)[:0] // Reset.
|
||||
p.Put(s)
|
||||
}
|
||||
|
||||
// ComponentName returns the component name for the metric reader with the
|
||||
// provided ComponentType and ID.
|
||||
func ComponentName(componentType string, id int64) string {
|
||||
return fmt.Sprintf("%s/%d", componentType, id)
|
||||
}
|
||||
|
||||
// Instrumentation is experimental instrumentation for the metric reader.
|
||||
type Instrumentation struct {
|
||||
colDuration metric.Float64Histogram
|
||||
|
||||
attrs []attribute.KeyValue
|
||||
recOpt metric.RecordOption
|
||||
}
|
||||
|
||||
// NewInstrumentation returns instrumentation for metric reader with the provided component
|
||||
// type (such as periodic and manual metric reader) and ID. It uses the global
|
||||
// MeterProvider to create the instrumentation.
|
||||
//
|
||||
// The id should be the unique metric reader instance ID. It is used
|
||||
// to set the "component.name" attribute.
|
||||
//
|
||||
// If the experimental observability is disabled, nil is returned.
|
||||
func NewInstrumentation(componentType string, id int64) (*Instrumentation, error) {
|
||||
if !x.Observability.Enabled() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
i := &Instrumentation{
|
||||
attrs: []attribute.KeyValue{
|
||||
semconv.OTelComponentName(ComponentName(componentType, id)),
|
||||
semconv.OTelComponentTypeKey.String(componentType),
|
||||
},
|
||||
}
|
||||
|
||||
r := attribute.NewSet(i.attrs...)
|
||||
i.recOpt = metric.WithAttributeSet(r)
|
||||
|
||||
meter := otel.GetMeterProvider().Meter(
|
||||
ScopeName,
|
||||
metric.WithInstrumentationVersion(sdk.Version()),
|
||||
metric.WithSchemaURL(SchemaURL),
|
||||
)
|
||||
|
||||
colDuration, err := otelconv.NewSDKMetricReaderCollectionDuration(meter)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to create collection duration metric: %w", err)
|
||||
}
|
||||
i.colDuration = colDuration.Inst()
|
||||
|
||||
return i, err
|
||||
}
|
||||
|
||||
// CollectMetrics instruments the collect method of metric reader. It returns an
|
||||
// [CollectOp] that must have its [CollectOp.End] method called when the
|
||||
// collection end.
|
||||
func (i *Instrumentation) CollectMetrics(ctx context.Context) CollectOp {
|
||||
start := time.Now()
|
||||
|
||||
return CollectOp{
|
||||
ctx: ctx,
|
||||
start: start,
|
||||
inst: i,
|
||||
}
|
||||
}
|
||||
|
||||
// CollectOp tracks the collect operation being observed by
|
||||
// [Instrumentation.CollectMetrics].
|
||||
type CollectOp struct {
|
||||
ctx context.Context
|
||||
start time.Time
|
||||
|
||||
inst *Instrumentation
|
||||
}
|
||||
|
||||
// End completes the observation of the operation being observed by a call to
|
||||
// [Instrumentation.CollectMetrics].
|
||||
//
|
||||
// Any error that is encountered is provided as err.
|
||||
func (e CollectOp) End(err error) {
|
||||
recOpt := get[metric.RecordOption](recordOptPool)
|
||||
defer put(recordOptPool, recOpt)
|
||||
*recOpt = append(*recOpt, e.inst.recordOption(err))
|
||||
|
||||
d := time.Since(e.start).Seconds()
|
||||
e.inst.colDuration.Record(e.ctx, d, *recOpt...)
|
||||
}
|
||||
|
||||
// recordOption returns a RecordOption with attributes representing the
|
||||
// outcome of the collection being recorded.
|
||||
//
|
||||
// If err is nil, the default recOpt of the Instrumentation is returned.
|
||||
//
|
||||
// Otherwise, a new RecordOption is returned with the base attributes of the
|
||||
// Instrumentation plus the error.type attribute set to the type of the error.
|
||||
func (i *Instrumentation) recordOption(err error) metric.RecordOption {
|
||||
if err == nil {
|
||||
return i.recOpt
|
||||
}
|
||||
|
||||
attrs := get[attribute.KeyValue](measureAttrsPool)
|
||||
defer put(measureAttrsPool, attrs)
|
||||
*attrs = append(*attrs, i.attrs...)
|
||||
*attrs = append(*attrs, semconv.ErrorType(err))
|
||||
|
||||
// Do not inefficiently make a copy of attrs by using WithAttributes
|
||||
// instead of WithAttributeSet.
|
||||
return metric.WithAttributeSet(attribute.NewSet(*attrs...))
|
||||
}
|
||||
@@ -0,0 +1,217 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package observ_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
mapi "go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/observ"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
const (
|
||||
ID = int64(42)
|
||||
ComponentType = "test-reader"
|
||||
)
|
||||
|
||||
var Scope = instrumentation.Scope{
|
||||
Name: observ.ScopeName,
|
||||
Version: sdk.Version(),
|
||||
SchemaURL: observ.SchemaURL,
|
||||
}
|
||||
|
||||
type errMeterProvider struct {
|
||||
mapi.MeterProvider
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
|
||||
return &errMeter{err: m.err}
|
||||
}
|
||||
|
||||
type errMeter struct {
|
||||
mapi.Meter
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) {
|
||||
return nil, m.err
|
||||
}
|
||||
|
||||
func TestNewInstrumentationObservabilityErrors(t *testing.T) {
|
||||
orig := otel.GetMeterProvider()
|
||||
t.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||
mp := &errMeterProvider{err: assert.AnError}
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
|
||||
_, err := observ.NewInstrumentation(ComponentType, ID)
|
||||
require.ErrorIs(t, err, assert.AnError, "new instrument errors should be joined")
|
||||
|
||||
assert.ErrorContains(t, err, "collection duration metric")
|
||||
}
|
||||
|
||||
func TestNewInstrumentationObservabilityDisabled(t *testing.T) {
|
||||
// Do not set OTEL_GO_X_OBSERVABILITY.
|
||||
got, err := observ.NewInstrumentation(ComponentType, ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, got)
|
||||
}
|
||||
|
||||
// setup installs a ManualReader MeterProvider and returns an instantiated
|
||||
// Instrumentation plus a collector that returns the single ScopeMetrics group.
|
||||
func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) {
|
||||
t.Helper()
|
||||
|
||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
|
||||
original := otel.GetMeterProvider()
|
||||
t.Cleanup(func() { otel.SetMeterProvider(original) })
|
||||
|
||||
r := metric.NewManualReader()
|
||||
mp := metric.NewMeterProvider(metric.WithReader(r))
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
inst, err := observ.NewInstrumentation(ComponentType, ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, inst)
|
||||
|
||||
return inst, func() metricdata.ScopeMetrics {
|
||||
var rm metricdata.ResourceMetrics
|
||||
require.NoError(t, r.Collect(t.Context(), &rm))
|
||||
require.Len(t, rm.ScopeMetrics, 1)
|
||||
return rm.ScopeMetrics[0]
|
||||
}
|
||||
}
|
||||
|
||||
func baseAttrs(err error) []attribute.KeyValue {
|
||||
attrs := []attribute.KeyValue{
|
||||
semconv.OTelComponentName(observ.ComponentName(ComponentType, ID)),
|
||||
semconv.OTelComponentTypeKey.String(ComponentType),
|
||||
}
|
||||
if err != nil {
|
||||
attrs = append(attrs, semconv.ErrorType(err))
|
||||
}
|
||||
return attrs
|
||||
}
|
||||
|
||||
func set(err error) attribute.Set {
|
||||
return attribute.NewSet(baseAttrs(err)...)
|
||||
}
|
||||
|
||||
func collectionDuration(err error) metricdata.Metrics {
|
||||
return metricdata.Metrics{
|
||||
Name: otelconv.SDKMetricReaderCollectionDuration{}.Name(),
|
||||
Description: otelconv.SDKMetricReaderCollectionDuration{}.Description(),
|
||||
Unit: otelconv.SDKMetricReaderCollectionDuration{}.Unit(),
|
||||
Data: metricdata.Histogram[float64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{Attributes: set(err)},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func assertCollectionMetrics(t *testing.T, got metricdata.ScopeMetrics, err error) {
|
||||
t.Helper()
|
||||
|
||||
assert.Equal(t, Scope, got.Scope, "unexpected scope")
|
||||
|
||||
m := got.Metrics
|
||||
require.Len(t, m, 1, "expected 1 metric (collection duration)")
|
||||
|
||||
want := collectionDuration(err)
|
||||
metricdatatest.AssertEqual(t, want, m[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
|
||||
}
|
||||
|
||||
func TestInstrumentationCollectMetricsSuccess(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
inst.CollectMetrics(t.Context()).End(nil)
|
||||
|
||||
assertCollectionMetrics(t, collect(), nil)
|
||||
}
|
||||
|
||||
func TestInstrumentationCollectMetricsError(t *testing.T) {
|
||||
inst, collect := setup(t)
|
||||
|
||||
wantErr := assert.AnError
|
||||
inst.CollectMetrics(t.Context()).End(wantErr)
|
||||
|
||||
assertCollectionMetrics(t, collect(), wantErr)
|
||||
}
|
||||
|
||||
func TestComponentName(t *testing.T) {
|
||||
tests := []struct {
|
||||
componentType string
|
||||
id int64
|
||||
want string
|
||||
}{
|
||||
{componentType: "periodic_metric_reader", id: 0, want: "periodic_metric_reader/0"},
|
||||
{componentType: "periodic_metric_reader", id: 1, want: "periodic_metric_reader/1"},
|
||||
{componentType: "periodic_metric_reader", id: 42, want: "periodic_metric_reader/42"},
|
||||
{componentType: "periodic_metric_reader", id: -1, want: "periodic_metric_reader/-1"},
|
||||
{componentType: "manual_metric_reader", id: 0, want: "manual_metric_reader/0"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
got := observ.ComponentName(tt.componentType, tt.id)
|
||||
assert.Equal(t, tt.want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func setupBench(b *testing.B) *observ.Instrumentation {
|
||||
b.Helper()
|
||||
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
|
||||
// Set up a proper MeterProvider for benchmarks
|
||||
original := otel.GetMeterProvider()
|
||||
b.Cleanup(func() { otel.SetMeterProvider(original) })
|
||||
|
||||
r := metric.NewManualReader()
|
||||
mp := metric.NewMeterProvider(metric.WithReader(r))
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
inst, err := observ.NewInstrumentation(ComponentType, ID)
|
||||
if err != nil {
|
||||
b.Fatalf("failed to create instrumentation: %v", err)
|
||||
}
|
||||
if inst == nil {
|
||||
b.Fatal("expected instrumentation, got nil")
|
||||
}
|
||||
return inst
|
||||
}
|
||||
|
||||
func BenchmarkInstrumentationCollectMetrics(b *testing.B) {
|
||||
run := func(err error) func(*testing.B) {
|
||||
inst := setupBench(b)
|
||||
return func(b *testing.B) {
|
||||
ctx := b.Context()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
inst.CollectMetrics(ctx).End(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err := errors.New("benchmark error")
|
||||
b.Run("NoError", run(nil))
|
||||
b.Run("Error", run(err))
|
||||
}
|
||||
@@ -10,10 +10,18 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/observ"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
const (
|
||||
// ManualReaderType uniquely identifies the OpenTelemetry Metric Reader component
|
||||
// being instrumented.
|
||||
manualReaderType = "go.opentelemetry.io/otel/sdk/metric/metric.ManualReader"
|
||||
)
|
||||
|
||||
// ManualReader is a simple Reader that allows an application to
|
||||
// read metrics on demand.
|
||||
type ManualReader struct {
|
||||
@@ -26,6 +34,8 @@ type ManualReader struct {
|
||||
|
||||
temporalitySelector TemporalitySelector
|
||||
aggregationSelector AggregationSelector
|
||||
|
||||
inst *observ.Instrumentation
|
||||
}
|
||||
|
||||
// Compile time check the manualReader implements Reader and is comparable.
|
||||
@@ -39,9 +49,24 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader {
|
||||
aggregationSelector: cfg.aggregationSelector,
|
||||
}
|
||||
r.externalProducers.Store(cfg.producers)
|
||||
|
||||
var err error
|
||||
r.inst, err = observ.NewInstrumentation(manualReaderType, nextManualReaderID())
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
var manualReaderIDCounter atomic.Int64
|
||||
|
||||
// nextManualReaderID returns an identifier for this manual reader,
|
||||
// starting with 0 and incrementing by 1 each time it is called.
|
||||
func nextManualReaderID() int64 {
|
||||
return manualReaderIDCounter.Add(1) - 1
|
||||
}
|
||||
|
||||
// register stores the sdkProducer which enables the caller
|
||||
// to read metrics from the SDK on demand.
|
||||
func (mr *ManualReader) register(p sdkProducer) {
|
||||
@@ -93,12 +118,20 @@ func (mr *ManualReader) Shutdown(context.Context) error {
|
||||
//
|
||||
// This method is safe to call concurrently.
|
||||
func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
|
||||
var err error
|
||||
if mr.inst != nil {
|
||||
cp := mr.inst.CollectMetrics(ctx)
|
||||
defer func() { cp.End(err) }()
|
||||
}
|
||||
|
||||
if rm == nil {
|
||||
return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
|
||||
err = errors.New("manual reader: *metricdata.ResourceMetrics is nil")
|
||||
return err
|
||||
}
|
||||
p := mr.sdkProducer.Load()
|
||||
if p == nil {
|
||||
return ErrReaderNotRegistered
|
||||
err = ErrReaderNotRegistered
|
||||
return err
|
||||
}
|
||||
|
||||
ph, ok := p.(produceHolder)
|
||||
@@ -107,11 +140,11 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
|
||||
// this should never happen. In the unforeseen case that this does
|
||||
// happen, return an error instead of panicking so a users code does
|
||||
// not halt in the processes.
|
||||
err := fmt.Errorf("manual reader: invalid producer: %T", p)
|
||||
err = fmt.Errorf("manual reader: invalid producer: %T", p)
|
||||
return err
|
||||
}
|
||||
|
||||
err := ph.produce(ctx, rm)
|
||||
err = ph.produce(ctx, rm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -5,14 +5,24 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
func TestManualReader(t *testing.T) {
|
||||
@@ -112,3 +122,282 @@ func TestManualReaderCollect(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManualReaderInstrumentation(t *testing.T) {
|
||||
// Enable SDK observability.
|
||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
|
||||
// ManualReader under test with a fake producer.
|
||||
manualReader := NewManualReader()
|
||||
t.Cleanup(func() { _ = manualReader.Shutdown(t.Context()) })
|
||||
manualReader.register(testSDKProducer{})
|
||||
|
||||
// Exercise a collect (producer data).
|
||||
var got metricdata.ResourceMetrics
|
||||
require.NoError(t, manualReader.Collect(t.Context(), &got))
|
||||
|
||||
// Collect again so we have something to scan through.
|
||||
var rm metricdata.ResourceMetrics
|
||||
require.NoError(t, manualReader.Collect(t.Context(), &rm))
|
||||
require.NotEmpty(t, rm.ScopeMetrics)
|
||||
|
||||
targetName := otelconv.SDKMetricReaderCollectionDuration{}.Name()
|
||||
targetDesc := otelconv.SDKMetricReaderCollectionDuration{}.Description()
|
||||
targetUnit := otelconv.SDKMetricReaderCollectionDuration{}.Unit()
|
||||
|
||||
// Find the SDK reader self-metric anywhere in the collected data.
|
||||
foundMetric := findMetricByName(&rm, targetName)
|
||||
|
||||
// If not found, explain and skip (this metric is emitted via the *global* MP).
|
||||
if foundMetric == nil {
|
||||
t.Skipf("SDK reader self-metric %q not found. It is emitted via the global MeterProvider; "+
|
||||
"this test does not install a global MP.", targetName)
|
||||
return
|
||||
}
|
||||
|
||||
// Basic identity checks.
|
||||
assert.Equal(t, targetName, foundMetric.Name)
|
||||
assert.Equal(t, targetDesc, foundMetric.Description)
|
||||
assert.Equal(t, targetUnit, foundMetric.Unit)
|
||||
|
||||
// Must be a histogram with cumulative temporality.
|
||||
hist, ok := foundMetric.Data.(metricdata.Histogram[float64])
|
||||
require.True(t, ok, "expected histogram data")
|
||||
assert.Equal(t, metricdata.CumulativeTemporality, hist.Temporality)
|
||||
require.NotEmpty(t, hist.DataPoints)
|
||||
|
||||
// Check base attributes on one datapoint (flexibly).
|
||||
dp := hist.DataPoints[0]
|
||||
attrs := dp.Attributes.ToSlice()
|
||||
t.Logf("observability attrs: %v", attrs)
|
||||
|
||||
const expectedComponentType = "go.opentelemetry.io/otel/sdk/metric/metric.ManualReader"
|
||||
|
||||
var hasName, hasType bool
|
||||
for _, a := range attrs {
|
||||
switch a.Key {
|
||||
case "otel.component.name":
|
||||
if s := a.Value.AsString(); s != "" && strings.Contains(s, "metric_reader") {
|
||||
hasName = true
|
||||
}
|
||||
case "otel.component.type":
|
||||
if a.Value.AsString() == expectedComponentType {
|
||||
hasType = true
|
||||
}
|
||||
}
|
||||
}
|
||||
assert.True(t, hasName, "expected non-empty otel.component.name containing 'metric_reader'")
|
||||
assert.True(t, hasType, "expected otel.component.type == %q", expectedComponentType)
|
||||
}
|
||||
|
||||
// findMetricByName searches all scopes/metrics for the given metric name.
|
||||
func findMetricByName(rm *metricdata.ResourceMetrics, name string) *metricdata.Metrics {
|
||||
for si := range rm.ScopeMetrics {
|
||||
sm := &rm.ScopeMetrics[si]
|
||||
for mi := range sm.Metrics {
|
||||
if sm.Metrics[mi].Name == name {
|
||||
return &sm.Metrics[mi]
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createMetricDataTestProducerForManual creates a producer using patterns from metricdatatest for manual reader benchmarks.
|
||||
func createMetricDataTestProducerForManual() testSDKProducer {
|
||||
return testSDKProducer{
|
||||
produceFunc: func(_ context.Context, rm *metricdata.ResourceMetrics) error {
|
||||
start := time.Now().Add(-time.Minute)
|
||||
end := time.Now()
|
||||
|
||||
// Create attribute sets using common patterns
|
||||
aliceAttrs := attribute.NewSet(attribute.String("user", "alice"), attribute.String("env", "prod"))
|
||||
bobAttrs := attribute.NewSet(attribute.String("user", "bob"), attribute.String("env", "staging"))
|
||||
charlieAttrs := attribute.NewSet(attribute.String("user", "charlie"), attribute.String("env", "dev"))
|
||||
|
||||
// Create exemplars for histogram metrics
|
||||
exemplars := []metricdata.Exemplar[float64]{
|
||||
{
|
||||
FilteredAttributes: []attribute.KeyValue{attribute.String("trace", "span-1")},
|
||||
Time: end,
|
||||
Value: 15.5,
|
||||
SpanID: []byte{1, 2, 3, 4, 5, 6, 7, 8},
|
||||
TraceID: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
|
||||
},
|
||||
}
|
||||
|
||||
// Define different metric types using metricdatatest patterns
|
||||
createScopeMetrics := func(scopeIdx int) metricdata.ScopeMetrics {
|
||||
metrics := []metricdata.Metrics{
|
||||
// Counter metrics
|
||||
{
|
||||
Name: fmt.Sprintf("requests_total_%d", scopeIdx),
|
||||
Description: "Total number of requests",
|
||||
Unit: "1",
|
||||
Data: metricdata.Sum[int64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{Attributes: aliceAttrs, StartTime: start, Time: end, Value: 100 + int64(scopeIdx*10)},
|
||||
{Attributes: bobAttrs, StartTime: start, Time: end, Value: 150 + int64(scopeIdx*15)},
|
||||
{Attributes: charlieAttrs, StartTime: start, Time: end, Value: 75 + int64(scopeIdx*5)},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Gauge metrics
|
||||
{
|
||||
Name: fmt.Sprintf("memory_usage_%d", scopeIdx),
|
||||
Description: "Memory usage in MB",
|
||||
Unit: "MB",
|
||||
Data: metricdata.Gauge[float64]{
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{Attributes: aliceAttrs, Time: end, Value: 512.5 + float64(scopeIdx*10)},
|
||||
{Attributes: bobAttrs, Time: end, Value: 768.2 + float64(scopeIdx*20)},
|
||||
{Attributes: charlieAttrs, Time: end, Value: 256.8 + float64(scopeIdx*5)},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Histogram metrics
|
||||
{
|
||||
Name: fmt.Sprintf("request_duration_%d", scopeIdx),
|
||||
Description: "Request duration histogram",
|
||||
Unit: "ms",
|
||||
Data: metricdata.Histogram[float64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{
|
||||
Attributes: aliceAttrs,
|
||||
StartTime: start,
|
||||
Time: end,
|
||||
Count: 100,
|
||||
Sum: 1500.5,
|
||||
Min: metricdata.NewExtrema(1.0),
|
||||
Max: metricdata.NewExtrema(50.0),
|
||||
Bounds: []float64{1, 5, 10, 25, 50, 100, 250, 500},
|
||||
BucketCounts: []uint64{10, 20, 30, 25, 10, 4, 1, 0, 0},
|
||||
Exemplars: exemplars,
|
||||
},
|
||||
{
|
||||
Attributes: bobAttrs,
|
||||
StartTime: start,
|
||||
Time: end,
|
||||
Count: 80,
|
||||
Sum: 1200.3,
|
||||
Min: metricdata.NewExtrema(2.0),
|
||||
Max: metricdata.NewExtrema(45.0),
|
||||
Bounds: []float64{1, 5, 10, 25, 50, 100, 250, 500},
|
||||
BucketCounts: []uint64{5, 15, 25, 20, 10, 4, 1, 0, 0},
|
||||
Exemplars: exemplars,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Exponential Histogram
|
||||
{
|
||||
Name: fmt.Sprintf("response_size_%d", scopeIdx),
|
||||
Description: "Response size exponential histogram",
|
||||
Unit: "bytes",
|
||||
Data: metricdata.ExponentialHistogram[float64]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{
|
||||
{
|
||||
Attributes: aliceAttrs,
|
||||
StartTime: start,
|
||||
Time: end,
|
||||
Count: 50,
|
||||
Sum: 25000.0,
|
||||
Min: metricdata.NewExtrema(100.0),
|
||||
Max: metricdata.NewExtrema(2000.0),
|
||||
Scale: 2,
|
||||
ZeroCount: 2,
|
||||
Exemplars: exemplars,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return metricdata.ScopeMetrics{
|
||||
Scope: instrumentation.Scope{
|
||||
Name: fmt.Sprintf("benchmark/scope/%d", scopeIdx),
|
||||
Version: "1.0.0",
|
||||
},
|
||||
Metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
// Create multiple scopes for comprehensive test data
|
||||
var scopeMetrics []metricdata.ScopeMetrics
|
||||
for i := 0; i < 20; i++ { // 20 scopes with 4 metrics each = 80 total metrics
|
||||
scopeMetrics = append(scopeMetrics, createScopeMetrics(i))
|
||||
}
|
||||
|
||||
*rm = metricdata.ResourceMetrics{
|
||||
Resource: resource.NewSchemaless(attribute.String("service.name", "benchmark-test")),
|
||||
ScopeMetrics: scopeMetrics,
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkManualReaderInstrumentation(b *testing.B) {
|
||||
run := func(b *testing.B, withInstrumentationMP bool) {
|
||||
// Save and restore the original global meter provider
|
||||
orig := otel.GetMeterProvider()
|
||||
defer otel.SetMeterProvider(orig)
|
||||
|
||||
// Suppress internal logging messages for cleaner benchmark output
|
||||
global.SetLogger(logr.Discard())
|
||||
b.Cleanup(func() {
|
||||
// Logger will be reset by test cleanup naturally
|
||||
})
|
||||
|
||||
// Suppress error handler messages for cleaner benchmark output
|
||||
origErrorHandler := otel.GetErrorHandler()
|
||||
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(error) {}))
|
||||
b.Cleanup(func() {
|
||||
otel.SetErrorHandler(origErrorHandler)
|
||||
})
|
||||
|
||||
if withInstrumentationMP {
|
||||
// Set up a meter provider for instrumentation to use
|
||||
instrumentationReader := NewManualReader()
|
||||
instrumentationMP := NewMeterProvider(WithReader(instrumentationReader))
|
||||
otel.SetMeterProvider(instrumentationMP)
|
||||
|
||||
// Clean up the instrumentation meter provider
|
||||
b.Cleanup(func() {
|
||||
_ = instrumentationMP.Shutdown(b.Context())
|
||||
})
|
||||
}
|
||||
|
||||
r := NewManualReader()
|
||||
// Register with producer using metricdatatest patterns for realistic benchmark data
|
||||
r.register(createMetricDataTestProducerForManual())
|
||||
b.Cleanup(func() {
|
||||
_ = r.Shutdown(b.Context()) // Ignore error in cleanup
|
||||
})
|
||||
|
||||
rm := &metricdata.ResourceMetrics{}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for b.Loop() {
|
||||
// Test the collect operation (simulating what manual readers do)
|
||||
err := r.Collect(b.Context(), rm)
|
||||
_ = err // Ignore error for benchmark
|
||||
}
|
||||
}
|
||||
|
||||
b.Run("NoObservability", func(b *testing.B) {
|
||||
b.Setenv("OTEL_GO_X_OBSERVABILITY", "false")
|
||||
run(b, false)
|
||||
})
|
||||
|
||||
b.Run("Observability", func(b *testing.B) {
|
||||
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
run(b, true)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user