1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Added the internal/observ package to otlploghttp (#7484)

- Part of https://github.com/open-telemetry/opentelemetry-go/issues/7018
- Generate x package from shared template

```
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/observ
cpu: Apple M3
                                         │ bmark.results │
                                         │    sec/op     │
InstrumentationExportLogs/NoError-8          98.71n ± 0%
InstrumentationExportLogs/PartialError-8     1.145µ ± 2%
InstrumentationExportLogs/FullError-8        1.164µ ± 1%
geomean                                      508.5n

                                         │ bmark.results │
                                         │     B/op      │
InstrumentationExportLogs/NoError-8         0.000 ± 0%
InstrumentationExportLogs/PartialError-8    769.0 ± 0%
InstrumentationExportLogs/FullError-8       769.0 ± 0%
geomean                                                ¹
¹ summaries must be >0 to compute geomean

                                         │ bmark.results │
                                         │   allocs/op   │
InstrumentationExportLogs/NoError-8         0.000 ± 0%
InstrumentationExportLogs/PartialError-8    5.000 ± 0%
InstrumentationExportLogs/FullError-8       5.000 ± 0%
geomean                                                ¹
¹ summaries must be >0 to compute geomean
```

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
ian
2025-10-14 22:46:23 +08:00
committed by GitHub
parent b3129d30be
commit b78550d4a9
14 changed files with 1045 additions and 2 deletions

View File

@@ -7,13 +7,16 @@ retract v0.12.0
require (
github.com/cenkalti/backoff/v5 v5.0.3
github.com/go-logr/logr v1.4.3
github.com/google/go-cmp v0.7.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/log v0.14.0
go.opentelemetry.io/otel/metric v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/sdk/log v0.14.0
go.opentelemetry.io/otel/sdk/log/logtest v0.14.0
go.opentelemetry.io/otel/sdk/metric v1.38.0
go.opentelemetry.io/otel/trace v1.38.0
go.opentelemetry.io/proto/otlp v1.8.0
google.golang.org/protobuf v1.36.10
@@ -21,13 +24,11 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
golang.org/x/net v0.45.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/text v0.30.0 // indirect

View File

@@ -5,6 +5,9 @@
// package.
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal"
//go:generate gotmpl --body=../../../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp\" }" --out=x/x.go
//go:generate gotmpl --body=../../../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry.go.tmpl "--data={}" --out=retry/retry.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry_test.go.tmpl "--data={}" --out=retry/retry_test.go

View File

@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package observ provides experimental observability instrumentation for the
// otlploghttp exporter.
package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/observ"

View File

@@ -0,0 +1,347 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/observ"
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/netip"
"strconv"
"strings"
"sync"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/x"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
const (
// ScopeName is the unique name of the meter used for instrumentation.
ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/observ"
// Version is the current version of this instrumentation
//
// This matches the version of the exporter.
Version = internal.Version
)
var (
attrsPool = &sync.Pool{
New: func() any {
const n = 1 + // component.name
1 + // component.type
1 + // server.addr
1 + // server.port
1 + // error.port
1 // http.response.status.code
s := make([]attribute.KeyValue, 0, n)
return &s
},
}
addOptPool = &sync.Pool{
New: func() any {
const n = 1 // WithAttributeSet
s := make([]metric.AddOption, 0, n)
return &s
},
}
recordPool = &sync.Pool{
New: func() any {
const n = 1 // WithAttributeSet
s := make([]metric.RecordOption, 0, n)
return &s
},
}
)
func get[T any](pool *sync.Pool) *[]T {
return pool.Get().(*[]T)
}
func put[T any](pool *sync.Pool, value *[]T) {
*value = (*value)[:0]
pool.Put(value)
}
// GetComponentName returns the constant name for the exporter with the
// provided id.
func GetComponentName(id int64) string {
return fmt.Sprintf("%s/%d", otelconv.ComponentTypeOtlpHTTPLogExporter, id)
}
// Instrumentation is experimental instrumentation for the exporter.
type Instrumentation struct {
inflightMetric metric.Int64UpDownCounter
exportedMetric metric.Int64Counter
operationDuration metric.Float64Histogram
presetAttrs []attribute.KeyValue
addOpt metric.AddOption
recordOpt metric.RecordOption
}
// NewInstrumentation returns instrumentation for otlplog http exporter.
func NewInstrumentation(id int64, target string) (*Instrumentation, error) {
if !x.Observability.Enabled() {
return nil, nil
}
inst := &Instrumentation{}
provider := otel.GetMeterProvider()
m := provider.Meter(
ScopeName,
metric.WithSchemaURL(semconv.SchemaURL),
metric.WithInstrumentationVersion(Version),
)
var e, err error
logInflight, e := otelconv.NewSDKExporterLogInflight(m)
if e != nil {
e = fmt.Errorf("failed to create the inflight metric %w", e)
err = errors.Join(err, e)
}
inst.inflightMetric = logInflight.Inst()
exported, e := otelconv.NewSDKExporterLogExported(m)
if e != nil {
e = fmt.Errorf("failed to create the exported metric %w", e)
err = errors.Join(err, e)
}
inst.exportedMetric = exported.Inst()
operation, e := otelconv.NewSDKExporterOperationDuration(m)
if e != nil {
e = fmt.Errorf("failed to create the operation duration metric %w", e)
err = errors.Join(err, e)
}
inst.operationDuration = operation.Inst()
if err != nil {
return nil, err
}
inst.presetAttrs = setPresetAttrs(GetComponentName(id), target)
inst.addOpt = metric.WithAttributeSet(attribute.NewSet(inst.presetAttrs...))
inst.recordOpt = metric.WithAttributeSet(attribute.NewSet(append(
[]attribute.KeyValue{semconv.HTTPResponseStatusCode(http.StatusOK)},
inst.presetAttrs...,
)...))
return inst, nil
}
func setPresetAttrs(name, target string) []attribute.KeyValue {
addrAttrs := ServerAddrAttrs(target)
attrs := make([]attribute.KeyValue, 0, 2+len(addrAttrs))
attrs = append(
attrs,
semconv.OTelComponentName(name),
semconv.OTelComponentTypeOtlpHTTPLogExporter,
)
attrs = append(attrs, addrAttrs...)
return attrs
}
// ServerAddrAttrs is a function that extracts server address and port attributes
// from a target string.
func ServerAddrAttrs(target string) []attribute.KeyValue {
host, port, err := parseTarget(target)
if err != nil || (host == "" && port < 0) {
if err != nil {
global.Debug("failed to parse target", "target", target, "error", err)
}
return nil
}
if port < 0 {
return []attribute.KeyValue{semconv.ServerAddress(host)}
}
if host == "" {
return []attribute.KeyValue{
semconv.ServerPort(port),
}
}
return []attribute.KeyValue{
semconv.ServerAddress(host),
semconv.ServerPort(port),
}
}
func (i *Instrumentation) ExportLogs(ctx context.Context, count int64) ExportOp {
start := time.Now()
addOpt := get[metric.AddOption](addOptPool)
defer put(addOptPool, addOpt)
*addOpt = append(*addOpt, i.addOpt)
i.inflightMetric.Add(ctx, count, *addOpt...)
return ExportOp{
ctx: ctx,
start: start,
inst: i,
count: count,
}
}
// ExportOp tracks the operationDuration being observed by [Instrumentation.ExportLogs].
type ExportOp struct {
ctx context.Context
start time.Time
inst *Instrumentation
count int64
}
// End completes the observation of the operationDuration being observed by a call to
// [Instrumentation.ExportLogs].
// Any error that is encountered is provided as err.
//
// If err is not nil, all logs will be recorded as failures unless error is of
// type [internal.PartialSuccess]. In the case of a PartialSuccess, the number
// of successfully exported logs will be determined by inspecting the
// RejectedItems field of the PartialSuccess.
func (e ExportOp) End(err error, code int) {
addOpt := get[metric.AddOption](addOptPool)
defer put(addOptPool, addOpt)
*addOpt = append(*addOpt, e.inst.addOpt)
e.inst.inflightMetric.Add(e.ctx, -e.count, *addOpt...)
success := successful(e.count, err)
e.inst.exportedMetric.Add(e.ctx, success, *addOpt...)
if err != nil {
attrs := get[attribute.KeyValue](attrsPool)
defer put(attrsPool, attrs)
*attrs = append(*attrs, e.inst.presetAttrs...)
*attrs = append(*attrs, semconv.ErrorType(err))
a := metric.WithAttributeSet(attribute.NewSet(*attrs...))
e.inst.exportedMetric.Add(e.ctx, e.count-success, a)
}
record := get[metric.RecordOption](recordPool)
defer put(recordPool, record)
*record = append(*record, e.recordOption(err, code))
duration := time.Since(e.start).Seconds()
e.inst.operationDuration.Record(e.ctx, duration, *record...)
}
func (e ExportOp) recordOption(err error, code int) metric.RecordOption {
if err == nil {
return e.inst.recordOpt
}
attrs := get[attribute.KeyValue](attrsPool)
defer put(attrsPool, attrs)
*attrs = append(*attrs, e.inst.presetAttrs...)
*attrs = append(
*attrs,
semconv.HTTPResponseStatusCode(code),
semconv.ErrorType(err),
)
return metric.WithAttributeSet(attribute.NewSet(*attrs...))
}
// successful returns the number of successfully exported logs out of the n
// that were exported based on the provided error.
//
// If err is nil, n is returned. All logs were successfully exported.
//
// If err is not nil and not an [internal.PartialSuccess] error, 0 is returned.
// It is assumed all logs failed to be exported.
//
// If err is an [internal.PartialSuccess] error, the number of successfully
// exported logs is computed by subtracting the RejectedItems field from n. If
// RejectedItems is negative, n is returned. If RejectedItems is greater than
// n, 0 is returned.
func successful(count int64, err error) int64 {
if err == nil {
return count
}
return count - rejected(count, err)
}
var errPool = sync.Pool{
New: func() any {
return new(internal.PartialSuccess)
},
}
// rejected returns how many out of the n logs exporter were rejected based on
// the provided non-nil err.
func rejected(n int64, err error) int64 {
ps := errPool.Get().(*internal.PartialSuccess)
defer errPool.Put(ps)
if errors.As(err, ps) {
// Bound RejectedItems to [0, n]. This should not be needed,
// but be defensive as this is from an external source.
return min(max(ps.RejectedItems, 0), n)
}
// all logs exported
return n
}
// parseEndpoint parses the host and port from target that has the form
// "host[:port]", or it returns an error if the target is not parsable.
//
// If no port is specified, -1 is returned.
//
// If no host is specified, an empty string is returned.
func parseTarget(endpoint string) (string, int, error) {
if ip := parseIP(endpoint); ip != "" {
return ip, -1, nil
}
// If there's no colon, there is no port (IPv6 with no port checked above).
if !strings.Contains(endpoint, ":") {
return endpoint, -1, nil
}
// Otherwise, parse as host:port.
host, portStr, err := net.SplitHostPort(endpoint)
if err != nil {
return "", -1, fmt.Errorf("invalid host:port %q: %w", endpoint, err)
}
const base, bitSize = 10, 16
port16, err := strconv.ParseUint(portStr, base, bitSize)
if err != nil {
return "", -1, fmt.Errorf("invalid port %q: %w", portStr, err)
}
port := int(port16)
return host, port, nil
}
// parseIP attempts to parse the entire target as an IP address.
// It returns the normalized string form of the IP if successful,
// or an empty string if parsing fails.
func parseIP(ip string) string {
// Strip leading and trailing brackets for IPv6 addresses.
if len(ip) >= 2 && ip[0] == '[' && ip[len(ip)-1] == ']' {
ip = ip[1 : len(ip)-1]
}
addr, err := netip.ParseAddr(ip)
if err != nil {
return ""
}
// Return the normalized string form of the IP.
return addr.String()
}

View File

@@ -0,0 +1,388 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ
import (
"net/http"
"testing"
"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal"
"go.opentelemetry.io/otel/internal/global"
mapi "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
const (
ID = 0
TARGET = "localhost:8080"
)
type errMeterProvider struct {
mapi.MeterProvider
err error
}
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
return &errMeter{err: m.err}
}
type errMeter struct {
mapi.Meter
err error
}
func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) {
return nil, m.err
}
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
return nil, m.err
}
func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) {
return nil, m.err
}
func TestNewInstrumentationObservabilityErrors(t *testing.T) {
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
mp := &errMeterProvider{err: assert.AnError}
otel.SetMeterProvider(mp)
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
_, err := NewInstrumentation(ID, TARGET)
require.ErrorIs(t, err, assert.AnError, "new instrument errors")
assert.ErrorContains(t, err, "inflight metric")
assert.ErrorContains(t, err, "exported metric")
assert.ErrorContains(t, err, "operation duration metric")
}
func TestNewInstrumentationObservabilityDisabled(t *testing.T) {
// Do not set OTEL_GO_X_OBSERVABILITY.
got, err := NewInstrumentation(ID, TARGET)
assert.NoError(t, err)
assert.Nil(t, got)
}
func set(err error) attribute.Set {
attrs := []attribute.KeyValue{
semconv.OTelComponentName(GetComponentName(ID)),
semconv.OTelComponentTypeKey.String(string(otelconv.ComponentTypeOtlpHTTPLogExporter)),
}
attrs = append(attrs, ServerAddrAttrs(TARGET)...)
if err != nil {
attrs = append(attrs, semconv.ErrorType(err))
}
return attribute.NewSet(attrs...)
}
func inflightMetric() metricdata.Metrics {
inflight := otelconv.SDKExporterLogInflight{}
return metricdata.Metrics{
Name: inflight.Name(),
Description: inflight.Description(),
Unit: inflight.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: set(nil),
Value: 0,
},
},
},
}
}
func exportedMetric(err error, total, success int64) metricdata.Metrics {
dp := []metricdata.DataPoint[int64]{
{
Attributes: set(nil),
Value: success,
},
}
if err != nil {
dp = append(dp, metricdata.DataPoint[int64]{
Attributes: set(err),
Value: total - success,
})
}
exported := otelconv.SDKExporterLogExported{}
return metricdata.Metrics{
Name: exported.Name(),
Description: exported.Description(),
Unit: exported.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: dp,
},
}
}
func operationDurationMetric(err error, code int) metricdata.Metrics {
attrs := []attribute.KeyValue{
semconv.OTelComponentName(GetComponentName(ID)),
semconv.OTelComponentTypeOtlpHTTPLogExporter,
semconv.HTTPResponseStatusCode(code),
}
attrs = append(attrs, ServerAddrAttrs(TARGET)...)
if err != nil {
attrs = append(attrs, semconv.ErrorType(err))
}
operation := otelconv.SDKExporterOperationDuration{}
return metricdata.Metrics{
Name: operation.Name(),
Description: operation.Description(),
Unit: operation.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(attrs...),
},
},
},
}
}
func setup(t *testing.T) (*Instrumentation, func() metricdata.ScopeMetrics) {
t.Helper()
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
original := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(original)
})
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
inst, err := NewInstrumentation(ID, TARGET)
require.NoError(t, err)
require.NotNil(t, inst)
return inst, func() metricdata.ScopeMetrics {
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(t.Context(), &rm))
require.Len(t, rm.ScopeMetrics, 1)
return rm.ScopeMetrics[0]
}
}
var Scope = instrumentation.Scope{
Name: ScopeName,
Version: Version,
SchemaURL: semconv.SchemaURL,
}
func assertMetrics(
t *testing.T,
got metricdata.ScopeMetrics,
count int64,
success int64,
err error,
code int,
) {
t.Helper()
assert.Equal(t, Scope, got.Scope, "unexpected scope")
m := got.Metrics
require.Len(t, m, 3, "expected 3 metrics")
o := metricdatatest.IgnoreTimestamp()
want := inflightMetric()
metricdatatest.AssertEqual(t, want, m[0], o)
want = exportedMetric(err, count, success)
metricdatatest.AssertEqual(t, want, m[1], o)
want = operationDurationMetric(err, code)
metricdatatest.AssertEqual(t, want, m[2], metricdatatest.IgnoreValue(), o)
}
func TestInstrumentationExportedLogs(t *testing.T) {
inst, collect := setup(t)
const n = 10
inst.ExportLogs(t.Context(), n).End(nil, http.StatusOK)
assertMetrics(t, collect(), n, n, nil, http.StatusOK)
}
func TestInstrumentationExportLogsPartialErrors(t *testing.T) {
inst, collect := setup(t)
const n = 10
const success = 5
err := internal.PartialSuccess{RejectedItems: n - success}
inst.ExportLogs(t.Context(), n).End(err, http.StatusPartialContent)
assertMetrics(t, collect(), n, success, err, http.StatusPartialContent)
}
func TestInstrumentationExportLogAllErrors(t *testing.T) {
inst, collect := setup(t)
const n = 10
const success = 0
inst.ExportLogs(t.Context(), n).End(assert.AnError, http.StatusUnauthorized)
assertMetrics(t, collect(), n, success, assert.AnError, http.StatusUnauthorized)
}
func TestInstrumentationExportLogsInvalidPartialErrored(t *testing.T) {
inst, collect := setup(t)
const n = 10
err := internal.PartialSuccess{RejectedItems: -5}
inst.ExportLogs(t.Context(), n).End(err, http.StatusPartialContent)
success := n
assertMetrics(t, collect(), n, int64(success), err, http.StatusPartialContent)
err.RejectedItems = n + 5
inst.ExportLogs(t.Context(), n).End(err, http.StatusPartialContent)
success += 0
assertMetrics(t, collect(), n+n, int64(success), err, http.StatusPartialContent)
}
func TestSetPresetAttrs(t *testing.T) {
tests := []struct {
endpoint string
host string
port int
}{
// Empty.
{endpoint: "", host: "", port: -1},
// Only a port.
{endpoint: ":4318", host: "", port: 4318},
// Hostname.
{endpoint: "localhost:4318", host: "localhost", port: 4318},
{endpoint: "localhost", host: "localhost", port: -1},
// IPv4 address.
{endpoint: "127.0.0.1:4318", host: "127.0.0.1", port: 4318},
{endpoint: "127.0.0.1", host: "127.0.0.1", port: -1},
// IPv6 address.
{endpoint: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{endpoint: "2001:db8:85a3:0:0:8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{endpoint: "2001:db8:85a3::8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{endpoint: "[2001:db8:85a3::8a2e:370:7334]", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{endpoint: "[::1]:9090", host: "::1", port: 9090},
// Port edge cases.
{endpoint: "example.com:0", host: "example.com", port: 0},
{endpoint: "example.com:65535", host: "example.com", port: 65535},
// Case insensitive.
{endpoint: "ExAmPlE.COM:8080", host: "ExAmPlE.COM", port: 8080},
}
for _, tt := range tests {
got := setPresetAttrs(GetComponentName(ID), tt.endpoint)
want := []attribute.KeyValue{
semconv.OTelComponentName(GetComponentName(ID)),
semconv.OTelComponentTypeOtlpHTTPLogExporter,
}
if tt.host != "" {
want = append(want, semconv.ServerAddress(tt.host))
}
if tt.port != -1 {
want = append(want, semconv.ServerPort(tt.port))
}
assert.Equal(t, want, got)
}
}
type logSink struct {
logr.LogSink
level int
msg string
keysAndValues []any
}
func (*logSink) Enabled(int) bool { return true }
func (l *logSink) Info(level int, msg string, keysAndValues ...any) {
l.level, l.msg, l.keysAndValues = level, msg, keysAndValues
l.LogSink.Info(level, msg, keysAndValues...)
}
func TestSetPresetAttrsError(t *testing.T) {
endpoints := []string{
"example.com:invalid", // Non-numeric port.
"example.com:8080:9090", // Multiple colons in port.
"example.com:99999", // Port out of range.
"example.com:-1", // Port out of range.
}
for _, endpoint := range endpoints {
l := &logSink{LogSink: testr.New(t).GetSink()}
t.Cleanup(func(orig logr.Logger) func() {
global.SetLogger(logr.New(l))
return func() { global.SetLogger(orig) }
}(global.GetLogger()))
// Set the logger as global so BaseAttrs can log the error.
got := setPresetAttrs(GetComponentName(ID), endpoint)
want := []attribute.KeyValue{
semconv.OTelComponentName(GetComponentName(ID)),
semconv.OTelComponentTypeOtlpHTTPLogExporter,
}
assert.Equal(t, want, got)
assert.Equal(t, 8, l.level, "expected Debug log level")
assert.Equal(t, "failed to parse target", l.msg)
}
}
func BenchmarkInstrumentationExportLogs(b *testing.B) {
setup := func(b *testing.B) *Instrumentation {
b.Helper()
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
inst, err := NewInstrumentation(ID, TARGET)
if err != nil {
b.Fatalf("failed to create instrumentation: %v", err)
}
return inst
}
run := func(err error, statusCode int) func(*testing.B) {
return func(b *testing.B) {
inst := setup(b)
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
inst.ExportLogs(b.Context(), 10).End(err, statusCode)
}
}
}
b.Run("NoError", run(nil, http.StatusOK))
err := &internal.PartialSuccess{RejectedItems: 6}
b.Run("PartialError", run(err, http.StatusOK))
b.Run("FullError", run(assert.AnError, http.StatusInternalServerError))
}

View File

@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal"
import "fmt"
// PartialSuccess represents the underlying error for all handling
// OTLP partial success messages. Use `errors.Is(err,
// PartialSuccess{})` to test whether an error passed to the OTel
// error handler belongs to this category.
type PartialSuccess struct {
ErrorMessage string
RejectedItems int64
RejectedKind string
}
var _ error = PartialSuccess{}
// Error implements the error interface.
func (ps PartialSuccess) Error() string {
msg := ps.ErrorMessage
if msg == "" {
msg = "empty message"
}
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
}
// Is supports the errors.Is() interface.
func (PartialSuccess) Is(err error) bool {
_, ok := err.(PartialSuccess)
return ok
}
// LogPartialSuccessError returns an error describing a partial success.
func LogPartialSuccessError(itemsRejected int64, errorMessage string) error {
return PartialSuccess{
ErrorMessage: errorMessage,
RejectedItems: itemsRejected,
RejectedKind: "logs",
}
}

View File

@@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func requireErrorString(t *testing.T, expect string, err error) {
t.Helper()
require.Error(t, err)
require.ErrorIs(t, err, PartialSuccess{})
const pfx = "OTLP partial success: "
msg := err.Error()
require.True(t, strings.HasPrefix(msg, pfx))
require.Equal(t, expect, msg[len(pfx):])
}
func TestPartialSuccessFormat(t *testing.T) {
requireErrorString(t, "empty message (0 logs rejected)", LogPartialSuccessError(0, ""))
requireErrorString(t, "help help (0 logs rejected)", LogPartialSuccessError(0, "help help"))
requireErrorString(
t,
"what happened (10 logs rejected)",
LogPartialSuccessError(10, "what happened"),
)
requireErrorString(t, "what happened (15 logs rejected)", LogPartialSuccessError(15, "what happened"))
}

View File

@@ -0,0 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal"
// Version is the current release version of the OpenTelemetry OTLP over HTTP/protobuf logs exporter in use.
const Version = "0.14.0"

View File

@@ -0,0 +1,36 @@
# Experimental Features
The `otlploghttp` exporter contains features that have not yet stabilized in the OpenTelemetry specification.
These features are added to the `otlploghttp` exporter prior to stabilization in the specification so that users can start experimenting with them and provide feedback.
These features may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.
## Features
- [Observability](#observability)
### Observability
The `otlploghttp` exporter can be configured to provide observability about itself using OpenTelemetry metrics.
To opt-in, set the environment variable `OTEL_GO_X_OBSERVABILITY` to `true`.
When enabled, the exporter will create the following metrics using the global `MeterProvider`:
- `otel.sdk.exporter.log.inflight`
- `otel.sdk.exporter.log.exported`
- `otel.sdk.exporter.operation.duration`
Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics.
[Semantic conventions for OpenTelemetry SDK metrics]: https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md
## Compatibility and Stability
Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../../VERSIONING.md).
These features may be removed or modified in successive version releases, including patch versions.
When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.
There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version.
If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support.

View File

@@ -0,0 +1,22 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/x"
import "strings"
// Observability is an experimental feature flag that determines if exporter
// observability metrics are enabled.
//
// To enable this feature set the OTEL_GO_X_OBSERVABILITY environment variable
// to the case-insensitive string value of "true" (i.e. "True" and "TRUE"
// will also enable this).
var Observability = newFeature(
[]string{"OBSERVABILITY"},
func(v string) (string, bool) {
if strings.EqualFold(v, "true") {
return v, true
}
return "", false
},
)

View File

@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestObservability(t *testing.T) {
const key = "OTEL_GO_X_OBSERVABILITY"
require.Contains(t, Observability.Keys(), key)
t.Run("100", run(setenv(key, "100"), assertDisabled(Observability)))
t.Run("true", run(setenv(key, "true"), assertEnabled(Observability, "true")))
t.Run("True", run(setenv(key, "True"), assertEnabled(Observability, "True")))
t.Run("false", run(setenv(key, "false"), assertDisabled(Observability)))
t.Run("empty", run(assertDisabled(Observability)))
}

View File

@@ -0,0 +1,58 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/x/x.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp].
package x // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/x"
import (
"os"
)
// Feature is an experimental feature control flag. It provides a uniform way
// to interact with these feature flags and parse their values.
type Feature[T any] struct {
keys []string
parse func(v string) (T, bool)
}
func newFeature[T any](suffix []string, parse func(string) (T, bool)) Feature[T] {
const envKeyRoot = "OTEL_GO_X_"
keys := make([]string, 0, len(suffix))
for _, s := range suffix {
keys = append(keys, envKeyRoot+s)
}
return Feature[T]{
keys: keys,
parse: parse,
}
}
// Keys returns the environment variable keys that can be set to enable the
// feature.
func (f Feature[T]) Keys() []string { return f.keys }
// Lookup returns the user configured value for the feature and true if the
// user has enabled the feature. Otherwise, if the feature is not enabled, a
// zero-value and false are returned.
func (f Feature[T]) Lookup() (v T, ok bool) {
// https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value
//
// > The SDK MUST interpret an empty value of an environment variable the
// > same way as when the variable is unset.
for _, key := range f.keys {
vRaw := os.Getenv(key)
if vRaw != "" {
return f.parse(vRaw)
}
}
return v, ok
}
// Enabled reports whether the feature is enabled.
func (f Feature[T]) Enabled() bool {
_, ok := f.Lookup()
return ok
}

View File

@@ -0,0 +1,75 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/x/x_text.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
mockKey = "OTEL_GO_X_MOCK_FEATURE"
mockKey2 = "OTEL_GO_X_MOCK_FEATURE2"
)
var mockFeature = newFeature([]string{"MOCK_FEATURE", "MOCK_FEATURE2"}, func(v string) (string, bool) {
if strings.EqualFold(v, "true") {
return v, true
}
return "", false
})
func TestFeature(t *testing.T) {
require.Contains(t, mockFeature.Keys(), mockKey)
require.Contains(t, mockFeature.Keys(), mockKey2)
t.Run("100", run(setenv(mockKey, "100"), assertDisabled(mockFeature)))
t.Run("true", run(setenv(mockKey, "true"), assertEnabled(mockFeature, "true")))
t.Run("True", run(setenv(mockKey, "True"), assertEnabled(mockFeature, "True")))
t.Run("false", run(setenv(mockKey, "false"), assertDisabled(mockFeature)))
t.Run("empty", run(assertDisabled(mockFeature)))
}
func run(steps ...func(*testing.T)) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
for _, step := range steps {
step(t)
}
}
}
func setenv(k, v string) func(t *testing.T) { //nolint:unparam // This is a reusable test utility function.
return func(t *testing.T) { t.Setenv(k, v) }
}
func assertEnabled[T any](f Feature[T], want T) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
assert.True(t, f.Enabled(), "not enabled")
v, ok := f.Lookup()
assert.True(t, ok, "Lookup state")
assert.Equal(t, want, v, "Lookup value")
}
}
func assertDisabled[T any](f Feature[T]) func(*testing.T) {
var zero T
return func(t *testing.T) {
t.Helper()
assert.False(t, f.Enabled(), "enabled")
v, ok := f.Lookup()
assert.False(t, ok, "Lookup state")
assert.Equal(t, zero, v, "Lookup value")
}
}

View File

@@ -58,3 +58,6 @@ modules:
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp:
version-refs:
- ./exporters/otlp/otlptrace/otlptracehttp/internal/version.go
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp:
version-refs:
- ./exporters/otlp/otlplog/otlploghttp/internal/version.go