1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Added the internal/observ package to log (#7532)

-  part of #7016

```txt
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/sdk/log/internal/observ
cpu: Apple M3
                            │ bench_res.txt │
                            │    sec/op     │
SLP/LogProcessed-8             0.8900n ± 2%
SLP/LogProcessedWithError-8     119.2n ± 5%
geomean                         10.30n

                            │ bench_res.txt │
                            │     B/op      │
SLP/LogProcessed-8             0.000 ± 0%
SLP/LogProcessedWithError-8    232.0 ± 0%
geomean                                   ¹
¹ summaries must be >0 to compute geomean

                            │ bench_res.txt │
                            │   allocs/op   │
SLP/LogProcessed-8             0.000 ± 0%
SLP/LogProcessedWithError-8    3.000 ± 0%
geomean                                   ¹
¹ summaries must be >0 to compute geomean
```

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Damien Mathieu <42@dmathieu.com>
This commit is contained in:
ian
2025-10-24 16:06:48 +08:00
committed by GitHub
parent f1ba3197d5
commit b15942f345
3 changed files with 317 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package observ provides observability instrumentation for the OTel log SDK
// package.
package observ // import "go.opentelemetry.io/otel/sdk/log/internal/observ"

View File

@@ -0,0 +1,107 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ // import "go.opentelemetry.io/otel/sdk/log/internal/observ"
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/log/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
const (
// ScopeName is the name of the instrumentation scope.
ScopeName = "go.opentelemetry.io/otel/sdk/log/internal/observ"
)
var measureAttrsPool = sync.Pool{
New: func() any {
// "component.name" + "component.type" + "error.type"
const n = 1 + 1 + 1
s := make([]attribute.KeyValue, 0, n)
// Return a pointer to a slice instead of a slice itself
// to avoid allocations on every call.
return &s
},
}
// GetSLPComponentName returns the component name attribute for a
// SimpleLogProcessor with the given ID.
func GetSLPComponentName(id int64) attribute.KeyValue {
t := otelconv.ComponentTypeSimpleLogProcessor
name := fmt.Sprintf("%s/%d", t, id)
return semconv.OTelComponentName(name)
}
// SLP is the instrumentation for an OTel SDK SimpleLogProcessor.
type SLP struct {
processed metric.Int64Counter
attrs []attribute.KeyValue
addOpts []metric.AddOption
}
// NewSLP returns instrumentation for an OTel SDK SimpleLogProcessor with the
// provided ID.
//
// If the experimental observability is disabled, nil is returned.
func NewSLP(id int64) (*SLP, error) {
if !x.Observability.Enabled() {
return nil, nil
}
meter := otel.GetMeterProvider()
mt := meter.Meter(
ScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
p, err := otelconv.NewSDKProcessorLogProcessed(mt)
if err != nil {
err = fmt.Errorf("failed to create a processed log metric: %w", err)
return nil, err
}
name := GetSLPComponentName(id)
componentType := p.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor)
attrs := []attribute.KeyValue{name, componentType}
addOpts := []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(attrs...))}
return &SLP{
processed: p.Inst(),
attrs: attrs,
addOpts: addOpts,
}, nil
}
// LogProcessed records that a log has been processed by the SimpleLogProcessor.
// If err is non-nil, it records the processing error as an attribute.
func (slp *SLP) LogProcessed(ctx context.Context, err error) {
slp.processed.Add(ctx, 1, slp.addOption(err)...)
}
func (slp *SLP) addOption(err error) []metric.AddOption {
if err == nil {
return slp.addOpts
}
attrs := measureAttrsPool.Get().(*[]attribute.KeyValue)
defer func() {
*attrs = (*attrs)[:0] // reset the slice
measureAttrsPool.Put(attrs)
}()
*attrs = append(*attrs, slp.attrs...)
*attrs = append(*attrs, semconv.ErrorType(err))
// Do not inefficiently make a copy of attrs by using
// WithAttributes instead of WithAttributeSet.
return []metric.AddOption{metric.WithAttributeSet(attribute.NewSet(*attrs...))}
}

View File

@@ -0,0 +1,204 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
mapi "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
type errMeterProvider struct {
mapi.MeterProvider
err error
}
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
return &errMeter{err: m.err}
}
type errMeter struct {
mapi.Meter
err error
}
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
return nil, m.err
}
const slpComponentID = 0
func TestNewSLPError(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
errMp := &errMeterProvider{err: assert.AnError}
otel.SetMeterProvider(errMp)
_, err := NewSLP(slpComponentID)
require.ErrorIs(t, err, assert.AnError)
assert.ErrorContains(t, err, "failed to create a processed log metric")
}
func TestNewSLPDisabled(t *testing.T) {
// Do not set OTEL_GO_X_OBSERVABILITY
bsp, err := NewSLP(slpComponentID)
assert.NoError(t, err)
assert.Nil(t, bsp)
}
func setup(t *testing.T) (*SLP, func() metricdata.ScopeMetrics) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
orig := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(orig)
})
reader := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(reader))
otel.SetMeterProvider(mp)
slp, err := NewSLP(slpComponentID)
require.NoError(t, err)
require.NotNil(t, slp)
return slp, func() metricdata.ScopeMetrics {
var got metricdata.ResourceMetrics
require.NoError(t, reader.Collect(t.Context(), &got))
require.Len(t, got.ScopeMetrics, 1)
return got.ScopeMetrics[0]
}
}
func processedMetric(err error) metricdata.Metrics {
processed := &otelconv.SDKProcessorLogProcessed{}
attrs := []attribute.KeyValue{
GetSLPComponentName(slpComponentID),
processed.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor),
}
if err != nil {
attrs = append(attrs, semconv.ErrorType(err))
}
dp := []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attrs...),
Value: 1,
},
}
return metricdata.Metrics{
Name: processed.Name(),
Description: processed.Description(),
Unit: processed.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: dp,
},
}
}
var Scope = instrumentation.Scope{
Name: ScopeName,
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
}
func assertMetric(t *testing.T, got metricdata.ScopeMetrics, err error) {
t.Helper()
assert.Equal(t, Scope, got.Scope, "unexpected scope")
m := got.Metrics
require.Len(t, m, 1, "expected 1 metrics")
o := metricdatatest.IgnoreTimestamp()
want := processedMetric(err)
metricdatatest.AssertEqual(t, want, m[0], o)
}
func TestSLP(t *testing.T) {
t.Run("NoError", func(t *testing.T) {
slp, collect := setup(t)
slp.LogProcessed(t.Context(), nil)
assertMetric(t, collect(), nil)
})
t.Run("Error", func(t *testing.T) {
processErr := errors.New("error processing log")
slp, collect := setup(t)
slp.LogProcessed(t.Context(), processErr)
assertMetric(t, collect(), processErr)
})
}
func BenchmarkSLP(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
newSLP := func(b *testing.B) *SLP {
b.Helper()
slp, err := NewSLP(slpComponentID)
require.NoError(b, err)
require.NotNil(b, slp)
return slp
}
b.Run("LogProcessed", func(b *testing.B) {
orig := otel.GetMeterProvider()
b.Cleanup(func() {
otel.SetMeterProvider(orig)
})
otel.SetMeterProvider(noop.NewMeterProvider())
ssp := newSLP(b)
ctx := b.Context()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ssp.LogProcessed(ctx, nil)
}
})
})
b.Run("LogProcessedWithError", func(b *testing.B) {
orig := otel.GetMeterProvider()
b.Cleanup(func() {
otel.SetMeterProvider(orig)
})
otel.SetMeterProvider(noop.NewMeterProvider())
slp := newSLP(b)
ctx := b.Context()
processErr := errors.New("error processing log")
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
slp.LogProcessed(ctx, processErr)
}
})
})
}