mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-03-29 21:47:00 +02:00
Use already enabled revive linter and add depguard (#2883)
* Refactor golangci-lint conf Order settings alphabetically. * Add revive settings to golangci conf * Check blank imports * Check bool-literal-in-expr * Check constant-logical-expr * Check context-as-argument * Check context-key-type * Check deep-exit * Check defer * Check dot-imports * Check duplicated-imports * Check early-return * Check empty-block * Check empty-lines * Check error-naming * Check error-return * Check error-strings * Check errorf * Stop ignoring context first arg in tests * Check exported comments * Check flag-parameter * Check identical branches * Check if-return * Check increment-decrement * Check indent-error-flow * Check deny list of go imports * Check import shadowing * Check package comments * Check range * Check range val in closure * Check range val address * Check redefines builtin id * Check string-format * Check struct tag * Check superfluous else * Check time equal * Check var naming * Check var declaration * Check unconditional recursion * Check unexported return * Check unhandled errors * Check unnecessary stmt * Check unnecessary break * Check waitgroup by value * Exclude deep-exit check in example*_test.go files
This commit is contained in:
parent
c5809aa8c7
commit
1f5b159161
.golangci.ymlhandler.gohandler_test.go
attribute
baggage
bridge
opencensus
opentracing
example
namedtracer
opencensus
otel-collector
passthrough
prometheus
zipkin
exporters
jaeger
otlp
otlpmetric
otlptrace
internal
otlptracegrpc
otlptracehttp
prometheus
stdout
stdoutmetric
stdouttrace
zipkin
internal
metric
sdk
internal/env
metric
resource
trace
216
.golangci.yml
216
.golangci.yml
@ -10,12 +10,13 @@ linters:
|
||||
# Specifically enable linters we want to use.
|
||||
enable:
|
||||
- deadcode
|
||||
- depguard
|
||||
- errcheck
|
||||
- godot
|
||||
- gofmt
|
||||
- goimports
|
||||
- gosimple
|
||||
- govet
|
||||
- godot
|
||||
- ineffassign
|
||||
- misspell
|
||||
- revive
|
||||
@ -25,30 +26,221 @@ linters:
|
||||
- unused
|
||||
- varcheck
|
||||
|
||||
|
||||
issues:
|
||||
# Maximum issues count per one linter.
|
||||
# Set to 0 to disable.
|
||||
# Default: 50
|
||||
# Setting to unlimited so the linter only is run once to debug all issues.
|
||||
max-issues-per-linter: 0
|
||||
# Maximum count of issues with the same text.
|
||||
# Set to 0 to disable.
|
||||
# Default: 3
|
||||
# Setting to unlimited so the linter only is run once to debug all issues.
|
||||
max-same-issues: 0
|
||||
# Excluding configuration per-path, per-linter, per-text and per-source.
|
||||
exclude-rules:
|
||||
# helpers in tests often (rightfully) pass a *testing.T as their first argument
|
||||
- path: _test\.go
|
||||
text: "context.Context should be the first parameter of a function"
|
||||
# TODO: Having appropriate comments for exported objects helps development,
|
||||
# even for objects in internal packages. Appropriate comments for all
|
||||
# exported objects should be added and this exclusion removed.
|
||||
- path: '.*internal/.*'
|
||||
text: "exported (method|function|type|const) (.+) should have comment or be unexported"
|
||||
linters:
|
||||
- revive
|
||||
# Yes, they are, but it's okay in a test
|
||||
# Yes, they are, but it's okay in a test.
|
||||
- path: _test\.go
|
||||
text: "exported func.*returns unexported type.*which can be annoying to use"
|
||||
linters:
|
||||
- revive
|
||||
# Example test functions should be treated like main.
|
||||
- path: example.*_test\.go
|
||||
text: "calls to (.+) only in main[(][)] or init[(][)] functions"
|
||||
linters:
|
||||
- revive
|
||||
include:
|
||||
# revive exported should have comment or be unexported.
|
||||
- EXC0012
|
||||
# revive package comment should be of the form ...
|
||||
- EXC0013
|
||||
|
||||
linters-settings:
|
||||
misspell:
|
||||
locale: US
|
||||
ignore-words:
|
||||
- cancelled
|
||||
goimports:
|
||||
local-prefixes: go.opentelemetry.io
|
||||
depguard:
|
||||
# Check the list against standard lib.
|
||||
# Default: false
|
||||
include-go-root: true
|
||||
# A list of packages for the list type specified.
|
||||
# Default: []
|
||||
packages:
|
||||
- "crypto/md5"
|
||||
- "crypto/sha1"
|
||||
- "crypto/**/pkix"
|
||||
ignore-file-rules:
|
||||
- "**/*_test.go"
|
||||
additional-guards:
|
||||
# Do not allow testing packages in non-test files.
|
||||
- list-type: denylist
|
||||
include-go-root: true
|
||||
packages:
|
||||
- testing
|
||||
- github.com/stretchr/testify
|
||||
ignore-file-rules:
|
||||
- "**/*_test.go"
|
||||
- "**/*test/*.go"
|
||||
- "**/internal/matchers/*.go"
|
||||
godot:
|
||||
exclude:
|
||||
# Exclude sentence fragments for lists.
|
||||
- '^[ ]*[-•]'
|
||||
# Exclude sentences prefixing a list.
|
||||
- ':$'
|
||||
goimports:
|
||||
local-prefixes: go.opentelemetry.io
|
||||
misspell:
|
||||
locale: US
|
||||
ignore-words:
|
||||
- cancelled
|
||||
revive:
|
||||
# Sets the default failure confidence.
|
||||
# This means that linting errors with less than 0.8 confidence will be ignored.
|
||||
# Default: 0.8
|
||||
confidence: 0.01
|
||||
rules:
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#blank-imports
|
||||
- name: blank-imports
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#bool-literal-in-expr
|
||||
- name: bool-literal-in-expr
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#constant-logical-expr
|
||||
- name: constant-logical-expr
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#context-as-argument
|
||||
- name: context-as-argument
|
||||
disabled: false
|
||||
arguments:
|
||||
allowTypesBefore: "*testing.T"
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#context-keys-type
|
||||
- name: context-keys-type
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#deep-exit
|
||||
- name: deep-exit
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#defer
|
||||
- name: defer
|
||||
disabled: false
|
||||
arguments:
|
||||
- ["call-chain", "loop"]
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#dot-imports
|
||||
- name: dot-imports
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#duplicated-imports
|
||||
- name: duplicated-imports
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#early-return
|
||||
- name: early-return
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#empty-block
|
||||
- name: empty-block
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#empty-lines
|
||||
- name: empty-lines
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#error-naming
|
||||
- name: error-naming
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#error-return
|
||||
- name: error-return
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#error-strings
|
||||
- name: error-strings
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#errorf
|
||||
- name: errorf
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#exported
|
||||
- name: exported
|
||||
disabled: false
|
||||
arguments:
|
||||
- "sayRepetitiveInsteadOfStutters"
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#flag-parameter
|
||||
- name: flag-parameter
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#identical-branches
|
||||
- name: identical-branches
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#if-return
|
||||
- name: if-return
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#increment-decrement
|
||||
- name: increment-decrement
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#indent-error-flow
|
||||
- name: indent-error-flow
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#import-shadowing
|
||||
- name: import-shadowing
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#package-comments
|
||||
- name: package-comments
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#range
|
||||
- name: range
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#range-val-in-closure
|
||||
- name: range-val-in-closure
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#range-val-address
|
||||
- name: range-val-address
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#redefines-builtin-id
|
||||
- name: redefines-builtin-id
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#string-format
|
||||
- name: string-format
|
||||
disabled: false
|
||||
arguments:
|
||||
- - panic
|
||||
- '/^[^\n]*$/'
|
||||
- must not contain line breaks
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#struct-tag
|
||||
- name: struct-tag
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#superfluous-else
|
||||
- name: superfluous-else
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#time-equal
|
||||
- name: time-equal
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#var-naming
|
||||
- name: var-naming
|
||||
disabled: false
|
||||
arguments:
|
||||
- ["ID"] # AllowList
|
||||
- ["Otel", "Aws", "Gcp"] # DenyList
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#var-declaration
|
||||
- name: var-declaration
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#unconditional-recursion
|
||||
- name: unconditional-recursion
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#unexported-return
|
||||
- name: unexported-return
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#unhandled-error
|
||||
- name: unhandled-error
|
||||
disabled: false
|
||||
arguments:
|
||||
- "fmt.Fprint"
|
||||
- "fmt.Fprintf"
|
||||
- "fmt.Fprintln"
|
||||
- "fmt.Print"
|
||||
- "fmt.Printf"
|
||||
- "fmt.Println"
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#unnecessary-stmt
|
||||
- name: unnecessary-stmt
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#useless-break
|
||||
- name: useless-break
|
||||
disabled: false
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#waitgroup-by-value
|
||||
- name: waitgroup-by-value
|
||||
disabled: false
|
||||
|
@ -133,9 +133,9 @@ func copyAndEscape(buf *bytes.Buffer, val string) {
|
||||
for _, ch := range val {
|
||||
switch ch {
|
||||
case '=', ',', escapeChar:
|
||||
buf.WriteRune(escapeChar)
|
||||
_, _ = buf.WriteRune(escapeChar)
|
||||
}
|
||||
buf.WriteRune(ch)
|
||||
_, _ = buf.WriteRune(ch)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,6 @@ func TestEmptyIterator(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMergedIterator(t *testing.T) {
|
||||
|
||||
type inputs struct {
|
||||
name string
|
||||
keys1 []string
|
||||
|
@ -71,8 +71,8 @@ func EmptySet() *Set {
|
||||
return emptySet
|
||||
}
|
||||
|
||||
// reflect abbreviates reflect.ValueOf.
|
||||
func (d Distinct) reflect() reflect.Value {
|
||||
// reflectValue abbreviates reflect.ValueOf(d).
|
||||
func (d Distinct) reflectValue() reflect.Value {
|
||||
return reflect.ValueOf(d.iface)
|
||||
}
|
||||
|
||||
@ -86,7 +86,7 @@ func (l *Set) Len() int {
|
||||
if l == nil || !l.equivalent.Valid() {
|
||||
return 0
|
||||
}
|
||||
return l.equivalent.reflect().Len()
|
||||
return l.equivalent.reflectValue().Len()
|
||||
}
|
||||
|
||||
// Get returns the KeyValue at ordered position idx in this set.
|
||||
@ -94,7 +94,7 @@ func (l *Set) Get(idx int) (KeyValue, bool) {
|
||||
if l == nil {
|
||||
return KeyValue{}, false
|
||||
}
|
||||
value := l.equivalent.reflect()
|
||||
value := l.equivalent.reflectValue()
|
||||
|
||||
if idx >= 0 && idx < value.Len() {
|
||||
// Note: The Go compiler successfully avoids an allocation for
|
||||
@ -110,7 +110,7 @@ func (l *Set) Value(k Key) (Value, bool) {
|
||||
if l == nil {
|
||||
return Value{}, false
|
||||
}
|
||||
rValue := l.equivalent.reflect()
|
||||
rValue := l.equivalent.reflectValue()
|
||||
vlen := rValue.Len()
|
||||
|
||||
idx := sort.Search(vlen, func(idx int) bool {
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
//go:generate stringer -type=Type
|
||||
|
||||
// Type describes the type of the data Value holds.
|
||||
type Type int
|
||||
type Type int // nolint: revive // redefines builtin Type.
|
||||
|
||||
// Value represents the value part in key-value pairs.
|
||||
type Value struct {
|
||||
|
@ -68,6 +68,9 @@ type Property struct {
|
||||
hasData bool
|
||||
}
|
||||
|
||||
// NewKeyProperty returns a new Property for key.
|
||||
//
|
||||
// If key is invalid, an error will be returned.
|
||||
func NewKeyProperty(key string) (Property, error) {
|
||||
if !keyRe.MatchString(key) {
|
||||
return newInvalidProperty(), fmt.Errorf("%w: %q", errInvalidKey, key)
|
||||
@ -77,6 +80,9 @@ func NewKeyProperty(key string) (Property, error) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// NewKeyValueProperty returns a new Property for key with value.
|
||||
//
|
||||
// If key or value are invalid, an error will be returned.
|
||||
func NewKeyValueProperty(key, value string) (Property, error) {
|
||||
if !keyRe.MatchString(key) {
|
||||
return newInvalidProperty(), fmt.Errorf("%w: %q", errInvalidKey, key)
|
||||
|
@ -612,7 +612,7 @@ func TestBaggageMembers(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
baggage := Baggage{list: baggage.List{
|
||||
bag := Baggage{list: baggage.List{
|
||||
"foo": {
|
||||
Value: "1",
|
||||
Properties: []baggage.Property{
|
||||
@ -626,13 +626,13 @@ func TestBaggageMembers(t *testing.T) {
|
||||
},
|
||||
}}
|
||||
|
||||
assert.ElementsMatch(t, members, baggage.Members())
|
||||
assert.ElementsMatch(t, members, bag.Members())
|
||||
}
|
||||
|
||||
func TestBaggageMember(t *testing.T) {
|
||||
baggage := Baggage{list: baggage.List{"foo": {Value: "1"}}}
|
||||
assert.Equal(t, Member{key: "foo", value: "1"}, baggage.Member("foo"))
|
||||
assert.Equal(t, Member{}, baggage.Member("bar"))
|
||||
bag := Baggage{list: baggage.List{"foo": {Value: "1"}}}
|
||||
assert.Equal(t, Member{key: "foo", value: "1"}, bag.Member("foo"))
|
||||
assert.Equal(t, Member{}, bag.Member("bar"))
|
||||
}
|
||||
|
||||
func TestMemberKey(t *testing.T) {
|
||||
|
@ -276,7 +276,7 @@ func TestHistogramAggregation(t *testing.T) {
|
||||
if output.Kind() != aggregation.HistogramKind {
|
||||
t.Errorf("recordAggregationsFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.HistogramKind)
|
||||
}
|
||||
if end != now {
|
||||
if !end.Equal(now) {
|
||||
t.Errorf("recordAggregationsFromPoints(%v).end() = %v, want %v", input, end, now)
|
||||
}
|
||||
distAgg, ok := output.(aggregation.Histogram)
|
||||
|
@ -37,7 +37,7 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
var errConversion = errors.New("Unable to convert from OpenCensus to OpenTelemetry")
|
||||
var errConversion = errors.New("unable to convert from OpenCensus to OpenTelemetry")
|
||||
|
||||
// NewMetricExporter returns an OpenCensus exporter that exports to an
|
||||
// OpenTelemetry exporter.
|
||||
|
@ -137,7 +137,6 @@ func TestToFromContext(t *testing.T) {
|
||||
// Get the opentelemetry span using the OpenCensus FromContext, and end it
|
||||
otSpan2 := octrace.FromContext(ctx)
|
||||
defer otSpan2.End()
|
||||
|
||||
}()
|
||||
|
||||
spans := sr.Ended()
|
||||
|
@ -349,10 +349,14 @@ func (t *BridgeTracer) SetOpenTelemetryTracer(tracer trace.Tracer) {
|
||||
t.setTracer.isSet = true
|
||||
}
|
||||
|
||||
// SetTextMapPropagator sets propagator as the TextMapPropagator to use by the
|
||||
// BridgeTracer.
|
||||
func (t *BridgeTracer) SetTextMapPropagator(propagator propagation.TextMapPropagator) {
|
||||
t.propagator = propagator
|
||||
}
|
||||
|
||||
// NewHookedContext returns a Context that has ctx as its parent and is
|
||||
// wrapped to handle baggage set and get operations.
|
||||
func (t *BridgeTracer) NewHookedContext(ctx context.Context) context.Context {
|
||||
ctx = iBaggage.ContextWithSetHook(ctx, t.baggageSetHook)
|
||||
ctx = iBaggage.ContextWithGetHook(ctx, t.baggageGetHook)
|
||||
@ -671,9 +675,9 @@ func (t *BridgeTracer) Extract(format interface{}, carrier interface{}) (ot.Span
|
||||
}
|
||||
header := http.Header(hhcarrier)
|
||||
ctx := t.getPropagator().Extract(context.Background(), propagation.HeaderCarrier(header))
|
||||
baggage := baggage.FromContext(ctx)
|
||||
bag := baggage.FromContext(ctx)
|
||||
bridgeSC := &bridgeSpanContext{
|
||||
bag: baggage,
|
||||
bag: bag,
|
||||
otelSpanContext: trace.SpanContextFromContext(ctx),
|
||||
}
|
||||
if !bridgeSC.otelSpanContext.IsValid() {
|
||||
|
@ -155,7 +155,7 @@ func (t *MockTracer) getRandSpanID() trace.SpanID {
|
||||
defer t.randLock.Unlock()
|
||||
|
||||
sid := trace.SpanID{}
|
||||
t.rand.Read(sid[:])
|
||||
_, _ = t.rand.Read(sid[:])
|
||||
|
||||
return sid
|
||||
}
|
||||
@ -165,7 +165,7 @@ func (t *MockTracer) getRandTraceID() trace.TraceID {
|
||||
defer t.randLock.Unlock()
|
||||
|
||||
tid := trace.TraceID{}
|
||||
t.rand.Read(tid[:])
|
||||
_, _ = t.rand.Read(tid[:])
|
||||
|
||||
return tid
|
||||
}
|
||||
|
@ -33,6 +33,9 @@ func NewTracerPair(tracer trace.Tracer) (*BridgeTracer, *WrapperTracerProvider)
|
||||
return bridgeTracer, wrapperProvider
|
||||
}
|
||||
|
||||
// NewTracerPairWithContext is a convience function. It calls NewTracerPair
|
||||
// and returns a hooked version of ctx with the created BridgeTracer along
|
||||
// with the BridgeTracer and WrapperTracerProvider.
|
||||
func NewTracerPairWithContext(ctx context.Context, tracer trace.Tracer) (context.Context, *BridgeTracer, *WrapperTracerProvider) {
|
||||
bridgeTracer, wrapperProvider := NewTracerPair(tracer)
|
||||
ctx = bridgeTracer.NewHookedContext(ctx)
|
||||
|
@ -21,6 +21,8 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// WrapperTracerProvider is an OpenTelemetry TracerProvider that wraps an
|
||||
// OpenTracing Tracer.
|
||||
type WrapperTracerProvider struct {
|
||||
wTracer *WrapperTracer
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/go-logr/stdr"
|
||||
@ -38,12 +39,10 @@ var (
|
||||
var tp *sdktrace.TracerProvider
|
||||
|
||||
// initTracer creates and registers trace provider instance.
|
||||
func initTracer() {
|
||||
var err error
|
||||
func initTracer() error {
|
||||
exp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
|
||||
if err != nil {
|
||||
log.Panicf("failed to initialize stdouttrace exporter %v\n", err)
|
||||
return
|
||||
return fmt.Errorf("failed to initialize stdouttrace exporter: %w", err)
|
||||
}
|
||||
bsp := sdktrace.NewBatchSpanProcessor(exp)
|
||||
tp = sdktrace.NewTracerProvider(
|
||||
@ -51,6 +50,7 @@ func initTracer() {
|
||||
sdktrace.WithSpanProcessor(bsp),
|
||||
)
|
||||
otel.SetTracerProvider(tp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
@ -58,7 +58,9 @@ func main() {
|
||||
stdr.SetVerbosity(5)
|
||||
|
||||
// initialize trace provider.
|
||||
initTracer()
|
||||
if err := initTracer(); err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
|
||||
// Create a named tracer with package path as its name.
|
||||
tracer := tp.Tracer("example/namedtracer/main")
|
||||
|
@ -62,7 +62,9 @@ func main() {
|
||||
log.Fatal(fmt.Errorf("error creating metric exporter: %w", err))
|
||||
}
|
||||
tracing(traceExporter)
|
||||
monitoring(metricsExporter)
|
||||
if err := monitoring(metricsExporter); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// tracing demonstrates overriding the OpenCensus DefaultTracer to send spans
|
||||
@ -100,18 +102,18 @@ func tracing(otExporter sdktrace.SpanExporter) {
|
||||
// monitoring demonstrates creating an IntervalReader using the OpenTelemetry
|
||||
// exporter to send metrics to the exporter by using either an OpenCensus
|
||||
// registry or an OpenCensus view.
|
||||
func monitoring(otExporter export.Exporter) {
|
||||
func monitoring(otExporter export.Exporter) error {
|
||||
log.Println("Using the OpenTelemetry stdoutmetric exporter to export OpenCensus metrics. This allows routing telemetry from both OpenTelemetry and OpenCensus to a single exporter.")
|
||||
ocExporter := opencensus.NewMetricExporter(otExporter)
|
||||
intervalReader, err := metricexport.NewIntervalReader(&metricexport.Reader{}, ocExporter)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create interval reader: %v\n", err)
|
||||
return fmt.Errorf("failed to create interval reader: %w", err)
|
||||
}
|
||||
intervalReader.ReportingInterval = 10 * time.Second
|
||||
log.Println("Emitting metrics using OpenCensus APIs. These should be printed out using the OpenTelemetry stdoutmetric exporter.")
|
||||
err = intervalReader.Start()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start interval reader: %v\n", err)
|
||||
return fmt.Errorf("failed to start interval reader: %w", err)
|
||||
}
|
||||
defer intervalReader.Stop()
|
||||
|
||||
@ -126,20 +128,20 @@ func monitoring(otExporter export.Exporter) {
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to add gauge: %v\n", err)
|
||||
return fmt.Errorf("failed to add gauge: %w", err)
|
||||
}
|
||||
entry, err := gauge.GetEntry()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to get gauge entry: %v\n", err)
|
||||
return fmt.Errorf("failed to get gauge entry: %w", err)
|
||||
}
|
||||
|
||||
log.Println("Registering a cumulative metric using an OpenCensus view.")
|
||||
if err := view.Register(countView); err != nil {
|
||||
log.Fatalf("Failed to register views: %v", err)
|
||||
return fmt.Errorf("failed to register views: %w", err)
|
||||
}
|
||||
ctx, err := tag.New(context.Background(), tag.Insert(keyType, "view"))
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to set tag: %v\n", err)
|
||||
return fmt.Errorf("failed to set tag: %w", err)
|
||||
}
|
||||
for i := int64(1); true; i++ {
|
||||
// update stats for our gauge
|
||||
@ -148,4 +150,5 @@ func monitoring(otExporter export.Exporter) {
|
||||
stats.Record(ctx, countMeasure.M(1))
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -21,6 +21,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
@ -38,7 +40,7 @@ import (
|
||||
|
||||
// Initializes an OTLP exporter, and configures the corresponding trace and
|
||||
// metric providers.
|
||||
func initProvider() func() {
|
||||
func initProvider() (func(context.Context) error, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
res, err := resource.New(ctx,
|
||||
@ -47,7 +49,9 @@ func initProvider() func() {
|
||||
semconv.ServiceNameKey.String("test-service"),
|
||||
),
|
||||
)
|
||||
handleErr(err, "failed to create resource")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create resource: %w", err)
|
||||
}
|
||||
|
||||
// If the OpenTelemetry Collector is running on a local cluster (minikube or
|
||||
// microk8s), it should be accessible through the NodePort service at the
|
||||
@ -55,11 +59,15 @@ func initProvider() func() {
|
||||
// endpoint of your cluster. If you run the app inside k8s, then you can
|
||||
// probably connect directly to the service through dns
|
||||
conn, err := grpc.DialContext(ctx, "localhost:30080", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
|
||||
handleErr(err, "failed to create gRPC connection to collector")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
|
||||
}
|
||||
|
||||
// Set up a trace exporter
|
||||
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
|
||||
handleErr(err, "failed to create trace exporter")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
|
||||
}
|
||||
|
||||
// Register the trace exporter with a TracerProvider, using a batch
|
||||
// span processor to aggregate spans before export.
|
||||
@ -74,17 +82,25 @@ func initProvider() func() {
|
||||
// set global propagator to tracecontext (the default is no-op).
|
||||
otel.SetTextMapPropagator(propagation.TraceContext{})
|
||||
|
||||
return func() {
|
||||
// Shutdown will flush any remaining spans and shut down the exporter.
|
||||
handleErr(tracerProvider.Shutdown(ctx), "failed to shutdown TracerProvider")
|
||||
}
|
||||
// Shutdown will flush any remaining spans and shut down the exporter.
|
||||
return tracerProvider.Shutdown, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
log.Printf("Waiting for connection...")
|
||||
|
||||
shutdown := initProvider()
|
||||
defer shutdown()
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer cancel()
|
||||
|
||||
shutdown, err := initProvider()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := shutdown(ctx); err != nil {
|
||||
log.Fatal("failed to shutdown TracerProvider: %w", err)
|
||||
}
|
||||
}()
|
||||
|
||||
tracer := otel.Tracer("test-tracer")
|
||||
|
||||
@ -98,7 +114,7 @@ func main() {
|
||||
|
||||
// work begins
|
||||
ctx, span := tracer.Start(
|
||||
context.Background(),
|
||||
ctx,
|
||||
"CollectorExporter-Example",
|
||||
trace.WithAttributes(commonAttrs...))
|
||||
defer span.End()
|
||||
@ -112,9 +128,3 @@ func main() {
|
||||
|
||||
log.Printf("Done!")
|
||||
}
|
||||
|
||||
func handleErr(err error, message string) {
|
||||
if err != nil {
|
||||
log.Fatalf("%s: %v", message, err)
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,8 @@ type Handler struct {
|
||||
next func(r *http.Request)
|
||||
}
|
||||
|
||||
// New returns a new Handler that will trace requests before handing them off
|
||||
// to next.
|
||||
func New(next func(r *http.Request)) *Handler {
|
||||
// Like most instrumentation packages, this handler defaults to using the
|
||||
// global progatators and tracer providers.
|
||||
|
@ -16,6 +16,7 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
@ -32,7 +33,10 @@ func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
initPassthroughGlobals()
|
||||
tp := nonGlobalTracer()
|
||||
tp, err := nonGlobalTracer()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer func() { _ = tp.Shutdown(ctx) }()
|
||||
|
||||
// make an initial http request
|
||||
@ -74,16 +78,15 @@ func initPassthroughGlobals() {
|
||||
|
||||
// nonGlobalTracer creates a trace provider instance for testing, but doesn't
|
||||
// set it as the global tracer provider.
|
||||
func nonGlobalTracer() *sdktrace.TracerProvider {
|
||||
var err error
|
||||
func nonGlobalTracer() (*sdktrace.TracerProvider, error) {
|
||||
exp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
|
||||
if err != nil {
|
||||
log.Panicf("failed to initialize stdouttrace exporter %v\n", err)
|
||||
return nil, fmt.Errorf("failed to initialize stdouttrace exporter: %w", err)
|
||||
}
|
||||
bsp := sdktrace.NewBatchSpanProcessor(exp)
|
||||
tp := sdktrace.NewTracerProvider(
|
||||
sdktrace.WithSampler(sdktrace.AlwaysSample()),
|
||||
sdktrace.WithSpanProcessor(bsp),
|
||||
)
|
||||
return tp
|
||||
return tp, nil
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -37,7 +39,7 @@ var (
|
||||
lemonsKey = attribute.Key("ex.com/lemons")
|
||||
)
|
||||
|
||||
func initMeter() {
|
||||
func initMeter() error {
|
||||
config := prometheus.Config{
|
||||
DefaultHistogramBoundaries: []float64{1, 2, 5, 10, 20, 50},
|
||||
}
|
||||
@ -52,7 +54,7 @@ func initMeter() {
|
||||
)
|
||||
exporter, err := prometheus.New(config, c)
|
||||
if err != nil {
|
||||
log.Panicf("failed to initialize prometheus exporter %v", err)
|
||||
return fmt.Errorf("failed to initialize prometheus exporter: %w", err)
|
||||
}
|
||||
|
||||
global.SetMeterProvider(exporter.MeterProvider())
|
||||
@ -63,10 +65,13 @@ func initMeter() {
|
||||
}()
|
||||
|
||||
fmt.Println("Prometheus server running on :2222")
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
initMeter()
|
||||
if err := initMeter(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
meter := global.Meter("ex.com/basic")
|
||||
|
||||
@ -86,7 +91,7 @@ func main() {
|
||||
gaugeObserver.Observe(ctx, value, attrs...)
|
||||
})
|
||||
|
||||
histogram, err := meter.SyncFloat64().Histogram("ex.com.two")
|
||||
hist, err := meter.SyncFloat64().Histogram("ex.com.two")
|
||||
if err != nil {
|
||||
log.Panicf("failed to initialize instrument: %v", err)
|
||||
}
|
||||
@ -98,14 +103,15 @@ func main() {
|
||||
commonAttrs := []attribute.KeyValue{lemonsKey.Int(10), attribute.String("A", "1"), attribute.String("B", "2"), attribute.String("C", "3")}
|
||||
notSoCommonAttrs := []attribute.KeyValue{lemonsKey.Int(13)}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer stop()
|
||||
|
||||
(*observerLock).Lock()
|
||||
*observerValueToReport = 1.0
|
||||
*observerAttrsToReport = commonAttrs
|
||||
(*observerLock).Unlock()
|
||||
|
||||
histogram.Record(ctx, 2.0, commonAttrs...)
|
||||
hist.Record(ctx, 2.0, commonAttrs...)
|
||||
counter.Add(ctx, 12.0, commonAttrs...)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
@ -114,7 +120,7 @@ func main() {
|
||||
*observerValueToReport = 1.0
|
||||
*observerAttrsToReport = notSoCommonAttrs
|
||||
(*observerLock).Unlock()
|
||||
histogram.Record(ctx, 2.0, notSoCommonAttrs...)
|
||||
hist.Record(ctx, 2.0, notSoCommonAttrs...)
|
||||
counter.Add(ctx, 22.0, notSoCommonAttrs...)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
@ -123,10 +129,10 @@ func main() {
|
||||
*observerValueToReport = 13.0
|
||||
*observerAttrsToReport = commonAttrs
|
||||
(*observerLock).Unlock()
|
||||
histogram.Record(ctx, 12.0, commonAttrs...)
|
||||
hist.Record(ctx, 12.0, commonAttrs...)
|
||||
counter.Add(ctx, 13.0, commonAttrs...)
|
||||
|
||||
fmt.Println("Example finished updating, please visit :2222")
|
||||
|
||||
select {}
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"flag"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
@ -34,7 +35,7 @@ import (
|
||||
var logger = log.New(os.Stderr, "zipkin-example", log.Ldate|log.Ltime|log.Llongfile)
|
||||
|
||||
// initTracer creates a new trace provider instance and registers it as global trace provider.
|
||||
func initTracer(url string) func() {
|
||||
func initTracer(url string) (func(context.Context) error, error) {
|
||||
// Create Zipkin Exporter and install it as a global tracer.
|
||||
//
|
||||
// For demoing purposes, always sample. In a production application, you should
|
||||
@ -45,7 +46,7 @@ func initTracer(url string) func() {
|
||||
zipkin.WithLogger(logger),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
batcher := sdktrace.NewBatchSpanProcessor(exporter)
|
||||
@ -59,19 +60,25 @@ func initTracer(url string) func() {
|
||||
)
|
||||
otel.SetTracerProvider(tp)
|
||||
|
||||
return func() {
|
||||
_ = tp.Shutdown(context.Background())
|
||||
}
|
||||
return tp.Shutdown, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
url := flag.String("zipkin", "http://localhost:9411/api/v2/spans", "zipkin url")
|
||||
flag.Parse()
|
||||
|
||||
shutdown := initTracer(*url)
|
||||
defer shutdown()
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer cancel()
|
||||
|
||||
ctx := context.Background()
|
||||
shutdown, err := initTracer(*url)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := shutdown(ctx); err != nil {
|
||||
log.Fatal("failed to shutdown TracerProvider: %w", err)
|
||||
}
|
||||
}()
|
||||
|
||||
tr := otel.GetTracerProvider().Tracer("component-main")
|
||||
ctx, span := tr.Start(ctx, "foo", trace.WithSpanKind(trace.SpanKindServer))
|
||||
|
@ -263,8 +263,8 @@ func keyValueToTag(keyValue attribute.KeyValue) *gen.Tag {
|
||||
attribute.INT64SLICE,
|
||||
attribute.FLOAT64SLICE,
|
||||
attribute.STRINGSLICE:
|
||||
json, _ := json.Marshal(keyValue.Value.AsInterface())
|
||||
a := (string)(json)
|
||||
data, _ := json.Marshal(keyValue.Value.AsInterface())
|
||||
a := (string)(data)
|
||||
tag = &gen.Tag{
|
||||
Key: string(keyValue.Key),
|
||||
VStr: &a,
|
||||
|
@ -67,13 +67,15 @@ func newUDPConn() (net.PacketConn, *net.UDPConn, error) {
|
||||
|
||||
addr, err := net.ResolveUDPAddr("udp", mockServer.LocalAddr().String())
|
||||
if err != nil {
|
||||
mockServer.Close()
|
||||
// Best effort.
|
||||
_ = mockServer.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
conn, err := net.DialUDP("udp", nil, addr)
|
||||
if err != nil {
|
||||
mockServer.Close()
|
||||
// Best effort.
|
||||
_ = mockServer.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ type batchUploader interface {
|
||||
shutdown(context.Context) error
|
||||
}
|
||||
|
||||
// EndpointOption configures a Jaeger endpoint.
|
||||
type EndpointOption interface {
|
||||
newBatchUploader() (batchUploader, error)
|
||||
}
|
||||
@ -75,6 +76,7 @@ func WithAgentEndpoint(options ...AgentEndpointOption) EndpointOption {
|
||||
})
|
||||
}
|
||||
|
||||
// AgentEndpointOption configures a Jaeger agent endpoint.
|
||||
type AgentEndpointOption interface {
|
||||
apply(agentEndpointConfig) agentEndpointConfig
|
||||
}
|
||||
@ -175,6 +177,7 @@ func WithCollectorEndpoint(options ...CollectorEndpointOption) EndpointOption {
|
||||
})
|
||||
}
|
||||
|
||||
// CollectorEndpointOption configures a Jaeger collector endpoint.
|
||||
type CollectorEndpointOption interface {
|
||||
apply(collectorEndpointConfig) collectorEndpointConfig
|
||||
}
|
||||
@ -306,7 +309,9 @@ func (c *collectorUploader) upload(ctx context.Context, batch *gen.Batch) error
|
||||
}
|
||||
|
||||
_, _ = io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
if err = resp.Body.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
|
||||
|
@ -74,7 +74,6 @@ func (e *Exporter) Start(ctx context.Context) error {
|
||||
|
||||
// Shutdown flushes all exports and closes all connections to the receiving endpoint.
|
||||
func (e *Exporter) Shutdown(ctx context.Context) error {
|
||||
|
||||
e.mu.RLock()
|
||||
started := e.started
|
||||
e.mu.RUnlock()
|
||||
@ -95,6 +94,7 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// TemporalityFor returns the accepted temporality for a metric measurment.
|
||||
func (e *Exporter) TemporalityFor(descriptor *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality {
|
||||
return e.temporalitySelector.TemporalityFor(descriptor, kind)
|
||||
}
|
||||
|
@ -233,7 +233,7 @@ func TestHistogramInt64MetricGroupingExport(t *testing.T) {
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
testLibName,
|
||||
)
|
||||
sum := 11.0
|
||||
sumVal := 11.0
|
||||
expected := []*metricpb.ResourceMetrics{
|
||||
{
|
||||
Resource: nil,
|
||||
@ -251,14 +251,14 @@ func TestHistogramInt64MetricGroupingExport(t *testing.T) {
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
Count: 2,
|
||||
Sum: &sum,
|
||||
Sum: &sumVal,
|
||||
ExplicitBounds: testHistogramBoundaries,
|
||||
BucketCounts: []uint64{1, 0, 0, 1},
|
||||
},
|
||||
{
|
||||
Attributes: cpu1Attrs,
|
||||
Count: 2,
|
||||
Sum: &sum,
|
||||
Sum: &sumVal,
|
||||
ExplicitBounds: testHistogramBoundaries,
|
||||
BucketCounts: []uint64{1, 0, 0, 1},
|
||||
StartTimeUnixNano: startTime(),
|
||||
@ -284,7 +284,7 @@ func TestHistogramFloat64MetricGroupingExport(t *testing.T) {
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
testLibName,
|
||||
)
|
||||
sum := 11.0
|
||||
sumVal := 11.0
|
||||
expected := []*metricpb.ResourceMetrics{
|
||||
{
|
||||
Resource: nil,
|
||||
@ -302,14 +302,14 @@ func TestHistogramFloat64MetricGroupingExport(t *testing.T) {
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
Count: 2,
|
||||
Sum: &sum,
|
||||
Sum: &sumVal,
|
||||
ExplicitBounds: testHistogramBoundaries,
|
||||
BucketCounts: []uint64{1, 0, 0, 1},
|
||||
},
|
||||
{
|
||||
Attributes: cpu1Attrs,
|
||||
Count: 2,
|
||||
Sum: &sum,
|
||||
Sum: &sumVal,
|
||||
ExplicitBounds: testHistogramBoundaries,
|
||||
BucketCounts: []uint64{1, 0, 0, 1},
|
||||
StartTimeUnixNano: startTime(),
|
||||
|
@ -48,8 +48,8 @@ func Iterator(iter attribute.Iterator) []*commonpb.KeyValue {
|
||||
}
|
||||
|
||||
// ResourceAttributes transforms a Resource OTLP key-values.
|
||||
func ResourceAttributes(resource *resource.Resource) []*commonpb.KeyValue {
|
||||
return Iterator(resource.Iter())
|
||||
func ResourceAttributes(res *resource.Resource) []*commonpb.KeyValue {
|
||||
return Iterator(res.Iter())
|
||||
}
|
||||
|
||||
// KeyValue transforms an attribute KeyValue into an OTLP key-value.
|
||||
|
@ -168,7 +168,6 @@ func TestArrayAttributes(t *testing.T) {
|
||||
assertExpectedArrayValues(t, expected.Values, actual.Values)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,7 +75,6 @@ func InstrumentationLibraryReader(ctx context.Context, temporalitySelector aggre
|
||||
var sms []*metricpb.ScopeMetrics
|
||||
|
||||
err := ilmr.ForEach(func(lib instrumentation.Library, mr export.Reader) error {
|
||||
|
||||
records, errc := source(ctx, temporalitySelector, mr)
|
||||
|
||||
// Start a fixed number of goroutines to transform records.
|
||||
@ -194,7 +193,6 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.Metric, error) {
|
||||
if !ok {
|
||||
grouped[mID] = res.Metric
|
||||
continue
|
||||
|
||||
}
|
||||
// Note: There is extra work happening in this code that can be
|
||||
// improved when the work described in #2119 is completed. The SDK has
|
||||
|
@ -165,7 +165,6 @@ func TestSumFloatDataPoints(t *testing.T) {
|
||||
}}}, m.GetSum())
|
||||
assert.Nil(t, m.GetHistogram())
|
||||
assert.Nil(t, m.GetSummary())
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,13 +230,13 @@ func (t *testAgg) Aggregation() aggregation.Aggregation {
|
||||
|
||||
// None of these three are used:
|
||||
|
||||
func (t *testAgg) Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error {
|
||||
func (t *testAgg) Update(context.Context, number.Number, *sdkapi.Descriptor) error {
|
||||
return nil
|
||||
}
|
||||
func (t *testAgg) SynchronizedMove(destination aggregator.Aggregator, descriptor *sdkapi.Descriptor) error {
|
||||
func (t *testAgg) SynchronizedMove(aggregator.Aggregator, *sdkapi.Descriptor) error {
|
||||
return nil
|
||||
}
|
||||
func (t *testAgg) Merge(aggregator aggregator.Aggregator, descriptor *sdkapi.Descriptor) error {
|
||||
func (t *testAgg) Merge(aggregator.Aggregator, *sdkapi.Descriptor) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -109,8 +109,7 @@ func WithEnvCompression(n string, fn func(Compression)) func(e *envconfig.EnvOpt
|
||||
return func(e *envconfig.EnvOptionsReader) {
|
||||
if v, ok := e.GetEnvValue(n); ok {
|
||||
cp := NoCompression
|
||||
switch v {
|
||||
case "gzip":
|
||||
if v == "gzip" {
|
||||
cp = GzipCompression
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func (f *fileReader) readFile(filename string) ([]byte, error) {
|
||||
if b, ok := (*f)[filename]; ok {
|
||||
return b, nil
|
||||
}
|
||||
return nil, errors.New("File not found")
|
||||
return nil, errors.New("file not found")
|
||||
}
|
||||
|
||||
func TestConfigs(t *testing.T) {
|
||||
|
@ -265,8 +265,8 @@ func retryable(err error) (bool, time.Duration) {
|
||||
|
||||
// throttleDelay returns a duration to wait for if an explicit throttle time
|
||||
// is included in the response status.
|
||||
func throttleDelay(status *status.Status) time.Duration {
|
||||
for _, detail := range status.Details() {
|
||||
func throttleDelay(s *status.Status) time.Duration {
|
||||
for _, detail := range s.Details() {
|
||||
if t, ok := detail.(*errdetails.RetryInfo); ok {
|
||||
return t.RetryDelay.AsDuration()
|
||||
}
|
||||
|
@ -249,7 +249,6 @@ func TestNewExporterWithTimeout(t *testing.T) {
|
||||
|
||||
for _, tt := range tts {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
||||
mc := runMockCollector(t)
|
||||
if tt.delay {
|
||||
mc.metricSvc.delay = time.Second * 10
|
||||
|
@ -84,8 +84,7 @@ func WithReconnectionPeriod(rp time.Duration) Option {
|
||||
}
|
||||
|
||||
func compressorToCompression(compressor string) otlpconfig.Compression {
|
||||
switch compressor {
|
||||
case "gzip":
|
||||
if compressor == "gzip" {
|
||||
return otlpconfig.GzipCompression
|
||||
}
|
||||
|
||||
|
@ -118,8 +118,7 @@ func WithEnvCompression(n string, fn func(Compression)) func(e *envconfig.EnvOpt
|
||||
return func(e *envconfig.EnvOptionsReader) {
|
||||
if v, ok := e.GetEnvValue(n); ok {
|
||||
cp := NoCompression
|
||||
switch v {
|
||||
case "gzip":
|
||||
if v == "gzip" {
|
||||
cp = GzipCompression
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func (f *fileReader) readFile(filename string) ([]byte, error) {
|
||||
if b, ok := (*f)[filename]; ok {
|
||||
return b, nil
|
||||
}
|
||||
return nil, errors.New("File not found")
|
||||
return nil, errors.New("file not found")
|
||||
}
|
||||
|
||||
func TestConfigs(t *testing.T) {
|
||||
|
@ -48,8 +48,8 @@ func Iterator(iter attribute.Iterator) []*commonpb.KeyValue {
|
||||
}
|
||||
|
||||
// ResourceAttributes transforms a Resource OTLP key-values.
|
||||
func ResourceAttributes(resource *resource.Resource) []*commonpb.KeyValue {
|
||||
return Iterator(resource.Iter())
|
||||
func ResourceAttributes(res *resource.Resource) []*commonpb.KeyValue {
|
||||
return Iterator(res.Iter())
|
||||
}
|
||||
|
||||
// KeyValue transforms an attribute KeyValue into an OTLP key-value.
|
||||
|
@ -168,7 +168,6 @@ func TestArrayAttributes(t *testing.T) {
|
||||
assertExpectedArrayValues(t, expected.Values, actual.Values)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,7 +175,6 @@ func TestStatus(t *testing.T) {
|
||||
expected := &tracepb.Status{Code: test.otlpStatus, Message: test.message}
|
||||
assert.Equal(t, expected, status(test.code, test.message))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestNilSpan(t *testing.T) {
|
||||
|
@ -265,8 +265,8 @@ func retryable(err error) (bool, time.Duration) {
|
||||
|
||||
// throttleDelay returns a duration to wait for if an explicit throttle time
|
||||
// is included in the response status.
|
||||
func throttleDelay(status *status.Status) time.Duration {
|
||||
for _, detail := range status.Details() {
|
||||
func throttleDelay(s *status.Status) time.Duration {
|
||||
for _, detail := range s.Details() {
|
||||
if t, ok := detail.(*errdetails.RetryInfo); ok {
|
||||
return t.RetryDelay.AsDuration()
|
||||
}
|
||||
|
@ -84,8 +84,7 @@ func WithReconnectionPeriod(rp time.Duration) Option {
|
||||
}
|
||||
|
||||
func compressorToCompression(compressor string) otlpconfig.Compression {
|
||||
switch compressor {
|
||||
case "gzip":
|
||||
if compressor == "gzip" {
|
||||
return otlpconfig.GzipCompression
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@ package otlptracehttp_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
@ -64,11 +65,11 @@ func newResource() *resource.Resource {
|
||||
)
|
||||
}
|
||||
|
||||
func installExportPipeline(ctx context.Context) func() {
|
||||
func installExportPipeline(ctx context.Context) (func(context.Context) error, error) {
|
||||
client := otlptracehttp.NewClient()
|
||||
exporter, err := otlptrace.New(ctx, client)
|
||||
if err != nil {
|
||||
log.Fatalf("creating OTLP trace exporter: %v", err)
|
||||
return nil, fmt.Errorf("creating OTLP trace exporter: %w", err)
|
||||
}
|
||||
|
||||
tracerProvider := sdktrace.NewTracerProvider(
|
||||
@ -77,18 +78,21 @@ func installExportPipeline(ctx context.Context) func() {
|
||||
)
|
||||
otel.SetTracerProvider(tracerProvider)
|
||||
|
||||
return func() {
|
||||
if err := tracerProvider.Shutdown(ctx); err != nil {
|
||||
log.Fatalf("stopping tracer provider: %v", err)
|
||||
}
|
||||
}
|
||||
return tracerProvider.Shutdown, nil
|
||||
}
|
||||
|
||||
func Example() {
|
||||
ctx := context.Background()
|
||||
// Registers a tracer Provider globally.
|
||||
cleanup := installExportPipeline(ctx)
|
||||
defer cleanup()
|
||||
shutdown, err := installExportPipeline(ctx)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := shutdown(ctx); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Println("the answer is", add(ctx, multiply(ctx, multiply(ctx, 2, 2), 10), 2))
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ type Config struct {
|
||||
|
||||
// New returns a new Prometheus exporter using the configured metric
|
||||
// controller. See controller.New().
|
||||
func New(config Config, controller *controller.Controller) (*Exporter, error) {
|
||||
func New(config Config, ctrl *controller.Controller) (*Exporter, error) {
|
||||
if config.Registry == nil {
|
||||
config.Registry = prometheus.NewRegistry()
|
||||
}
|
||||
@ -105,7 +105,7 @@ func New(config Config, controller *controller.Controller) (*Exporter, error) {
|
||||
handler: promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}),
|
||||
registerer: config.Registerer,
|
||||
gatherer: config.Gatherer,
|
||||
controller: controller,
|
||||
controller: ctrl,
|
||||
}
|
||||
|
||||
c := &collector{
|
||||
@ -176,7 +176,6 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
|
||||
err := ctrl.ForEach(func(_ instrumentation.Library, reader export.Reader) error {
|
||||
return reader.ForEach(c.exp, func(record export.Record) error {
|
||||
|
||||
agg := record.Aggregation()
|
||||
numberKind := record.Descriptor().NumberKind()
|
||||
instrumentKind := record.Descriptor().InstrumentKind()
|
||||
@ -186,23 +185,26 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
|
||||
desc := c.toDesc(record, attrKeys)
|
||||
|
||||
if hist, ok := agg.(aggregation.Histogram); ok {
|
||||
if err := c.exportHistogram(ch, hist, numberKind, desc, attrs); err != nil {
|
||||
switch v := agg.(type) {
|
||||
case aggregation.Histogram:
|
||||
if err := c.exportHistogram(ch, v, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting histogram: %w", err)
|
||||
}
|
||||
} else if sum, ok := agg.(aggregation.Sum); ok && instrumentKind.Monotonic() {
|
||||
if err := c.exportMonotonicCounter(ch, sum, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting monotonic counter: %w", err)
|
||||
case aggregation.Sum:
|
||||
if instrumentKind.Monotonic() {
|
||||
if err := c.exportMonotonicCounter(ch, v, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting monotonic counter: %w", err)
|
||||
}
|
||||
} else {
|
||||
if err := c.exportNonMonotonicCounter(ch, v, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting non monotonic counter: %w", err)
|
||||
}
|
||||
}
|
||||
} else if sum, ok := agg.(aggregation.Sum); ok && !instrumentKind.Monotonic() {
|
||||
if err := c.exportNonMonotonicCounter(ch, sum, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting non monotonic counter: %w", err)
|
||||
}
|
||||
} else if lastValue, ok := agg.(aggregation.LastValue); ok {
|
||||
if err := c.exportLastValue(ch, lastValue, numberKind, desc, attrs); err != nil {
|
||||
case aggregation.LastValue:
|
||||
if err := c.exportLastValue(ch, v, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting last value: %w", err)
|
||||
}
|
||||
} else {
|
||||
default:
|
||||
return fmt.Errorf("%w: %s", ErrUnsupportedAggregator, agg.Kind())
|
||||
}
|
||||
return nil
|
||||
|
@ -111,7 +111,7 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
counter, err := meter.SyncFloat64().Counter("counter")
|
||||
require.NoError(t, err)
|
||||
histogram, err := meter.SyncFloat64().Histogram("histogram")
|
||||
hist, err := meter.SyncFloat64().Histogram("histogram")
|
||||
require.NoError(t, err)
|
||||
|
||||
attrs := []attribute.KeyValue{
|
||||
@ -137,10 +137,10 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
|
||||
expected = append(expected, expectGauge("intgaugeobserver", `intgaugeobserver{A="B",C="D",R="V"} 1`))
|
||||
|
||||
histogram.Record(ctx, -0.6, attrs...)
|
||||
histogram.Record(ctx, -0.4, attrs...)
|
||||
histogram.Record(ctx, 0.6, attrs...)
|
||||
histogram.Record(ctx, 20, attrs...)
|
||||
hist.Record(ctx, -0.6, attrs...)
|
||||
hist.Record(ctx, -0.4, attrs...)
|
||||
hist.Record(ctx, 0.6, attrs...)
|
||||
hist.Record(ctx, 20, attrs...)
|
||||
|
||||
expected = append(expected, expectHistogram("histogram",
|
||||
`histogram_bucket{A="B",C="D",R="V",le="-0.5"} 1`,
|
||||
|
@ -55,7 +55,6 @@ func newConfig(options ...Option) (config, error) {
|
||||
}
|
||||
for _, opt := range options {
|
||||
cfg = opt.apply(cfg)
|
||||
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
@ -12,8 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package stdout contains an OpenTelemetry exporter for metric telemetry
|
||||
// to be written to an output destination as JSON.
|
||||
// Package stdoutmetric contains an OpenTelemetry exporter for metric
|
||||
// telemetry to be written to an output destination as JSON.
|
||||
//
|
||||
// This package is currently in a pre-GA phase. Backwards incompatible changes
|
||||
// may be introduced in subsequent minor version releases as we work to track
|
||||
|
@ -16,6 +16,7 @@ package stdoutmetric_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
@ -60,10 +61,10 @@ func multiply(ctx context.Context, x, y int64) int64 {
|
||||
return x * y
|
||||
}
|
||||
|
||||
func InstallExportPipeline(ctx context.Context) func() {
|
||||
func InstallExportPipeline(ctx context.Context) (func(context.Context) error, error) {
|
||||
exporter, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint())
|
||||
if err != nil {
|
||||
log.Fatalf("creating stdoutmetric exporter: %v", err)
|
||||
return nil, fmt.Errorf("creating stdoutmetric exporter: %w", err)
|
||||
}
|
||||
|
||||
pusher := controller.New(
|
||||
@ -89,19 +90,22 @@ func InstallExportPipeline(ctx context.Context) func() {
|
||||
log.Fatalf("creating instrument: %v", err)
|
||||
}
|
||||
|
||||
return func() {
|
||||
if err := pusher.Stop(ctx); err != nil {
|
||||
log.Fatalf("stopping push controller: %v", err)
|
||||
}
|
||||
}
|
||||
return pusher.Stop, nil
|
||||
}
|
||||
|
||||
func Example() {
|
||||
ctx := context.Background()
|
||||
|
||||
// TODO: Registers a meter Provider globally.
|
||||
cleanup := InstallExportPipeline(ctx)
|
||||
defer cleanup()
|
||||
shutdown, err := InstallExportPipeline(ctx)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := shutdown(ctx); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Println("the answer is", add(ctx, multiply(ctx, multiply(ctx, 2, 2), 10), 2))
|
||||
}
|
||||
|
@ -16,6 +16,8 @@ package stdoutmetric // import "go.opentelemetry.io/otel/exporters/stdout/stdout
|
||||
|
||||
import "go.opentelemetry.io/otel/sdk/metric/export"
|
||||
|
||||
// Exporter is an OpenTelemetry metric exporter that transmits telemetry to
|
||||
// the local STDOUT.
|
||||
type Exporter struct {
|
||||
metricExporter
|
||||
}
|
||||
|
@ -53,7 +53,6 @@ func (e *metricExporter) Export(_ context.Context, res *resource.Resource, reade
|
||||
var aggError error
|
||||
var batch []line
|
||||
aggError = reader.ForEach(func(lib instrumentation.Library, mr export.Reader) error {
|
||||
|
||||
var instAttrs []attribute.KeyValue
|
||||
if name := lib.Name; name != "" {
|
||||
instAttrs = append(instAttrs, attribute.String("instrumentation.name", name))
|
||||
@ -101,20 +100,20 @@ func (e *metricExporter) Export(_ context.Context, res *resource.Resource, reade
|
||||
|
||||
var sb strings.Builder
|
||||
|
||||
sb.WriteString(desc.Name())
|
||||
_, _ = sb.WriteString(desc.Name())
|
||||
|
||||
if len(encodedAttrs) > 0 || len(encodedResource) > 0 || len(encodedInstAttrs) > 0 {
|
||||
sb.WriteRune('{')
|
||||
sb.WriteString(encodedResource)
|
||||
_, _ = sb.WriteRune('{')
|
||||
_, _ = sb.WriteString(encodedResource)
|
||||
if len(encodedInstAttrs) > 0 && len(encodedResource) > 0 {
|
||||
sb.WriteRune(',')
|
||||
_, _ = sb.WriteRune(',')
|
||||
}
|
||||
sb.WriteString(encodedInstAttrs)
|
||||
_, _ = sb.WriteString(encodedInstAttrs)
|
||||
if len(encodedAttrs) > 0 && (len(encodedInstAttrs) > 0 || len(encodedResource) > 0) {
|
||||
sb.WriteRune(',')
|
||||
_, _ = sb.WriteRune(',')
|
||||
}
|
||||
sb.WriteString(encodedAttrs)
|
||||
sb.WriteRune('}')
|
||||
_, _ = sb.WriteString(encodedAttrs)
|
||||
_, _ = sb.WriteRune('}')
|
||||
}
|
||||
|
||||
expose.Name = sb.String()
|
||||
|
@ -48,7 +48,6 @@ func newConfig(options ...Option) (config, error) {
|
||||
}
|
||||
for _, opt := range options {
|
||||
cfg = opt.apply(cfg)
|
||||
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
@ -12,6 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package stdout contains an OpenTelemetry exporter for tracing
|
||||
// Package stdouttrace contains an OpenTelemetry exporter for tracing
|
||||
// telemetry to be written to an output destination as JSON.
|
||||
package stdouttrace // import "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
|
||||
|
@ -16,6 +16,7 @@ package stdouttrace_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
@ -63,10 +64,10 @@ func Resource() *resource.Resource {
|
||||
)
|
||||
}
|
||||
|
||||
func InstallExportPipeline(ctx context.Context) func() {
|
||||
func InstallExportPipeline(ctx context.Context) (func(context.Context) error, error) {
|
||||
exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
|
||||
if err != nil {
|
||||
log.Fatalf("creating stdout exporter: %v", err)
|
||||
return nil, fmt.Errorf("creating stdout exporter: %w", err)
|
||||
}
|
||||
|
||||
tracerProvider := sdktrace.NewTracerProvider(
|
||||
@ -75,19 +76,22 @@ func InstallExportPipeline(ctx context.Context) func() {
|
||||
)
|
||||
otel.SetTracerProvider(tracerProvider)
|
||||
|
||||
return func() {
|
||||
if err := tracerProvider.Shutdown(ctx); err != nil {
|
||||
log.Fatalf("stopping tracer provider: %v", err)
|
||||
}
|
||||
}
|
||||
return tracerProvider.Shutdown, nil
|
||||
}
|
||||
|
||||
func Example() {
|
||||
ctx := context.Background()
|
||||
|
||||
// Registers a tracer Provider globally.
|
||||
cleanup := InstallExportPipeline(ctx)
|
||||
defer cleanup()
|
||||
shutdown, err := InstallExportPipeline(ctx)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := shutdown(ctx); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Println("the answer is", add(ctx, multiply(ctx, multiply(ctx, 2, 2), 10), 2))
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ func TestExporterExportSpan(t *testing.T) {
|
||||
traceState, _ := trace.ParseTraceState("key=val")
|
||||
keyValue := "value"
|
||||
doubleValue := 123.456
|
||||
resource := resource.NewSchemaless(attribute.String("rk1", "rv11"))
|
||||
res := resource.NewSchemaless(attribute.String("rk1", "rv11"))
|
||||
|
||||
ss := tracetest.SpanStub{
|
||||
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
|
||||
@ -65,7 +65,7 @@ func TestExporterExportSpan(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "interesting",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
@ -160,8 +159,8 @@ func toZipkinAnnotations(events []tracesdk.Event) []zkmodel.Annotation {
|
||||
|
||||
func attributesToJSONMapString(attributes []attribute.KeyValue) string {
|
||||
m := make(map[string]interface{}, len(attributes))
|
||||
for _, attribute := range attributes {
|
||||
m[(string)(attribute.Key)] = attribute.Value.AsInterface()
|
||||
for _, a := range attributes {
|
||||
m[(string)(a.Key)] = a.Value.AsInterface()
|
||||
}
|
||||
// if an error happens, the result will be an empty string
|
||||
jsonBytes, _ := json.Marshal(m)
|
||||
@ -173,17 +172,17 @@ func attributeToStringPair(kv attribute.KeyValue) (string, string) {
|
||||
switch kv.Value.Type() {
|
||||
// For slice attributes, serialize as JSON list string.
|
||||
case attribute.BOOLSLICE:
|
||||
json, _ := json.Marshal(kv.Value.AsBoolSlice())
|
||||
return (string)(kv.Key), (string)(json)
|
||||
data, _ := json.Marshal(kv.Value.AsBoolSlice())
|
||||
return (string)(kv.Key), (string)(data)
|
||||
case attribute.INT64SLICE:
|
||||
json, _ := json.Marshal(kv.Value.AsInt64Slice())
|
||||
return (string)(kv.Key), (string)(json)
|
||||
data, _ := json.Marshal(kv.Value.AsInt64Slice())
|
||||
return (string)(kv.Key), (string)(data)
|
||||
case attribute.FLOAT64SLICE:
|
||||
json, _ := json.Marshal(kv.Value.AsFloat64Slice())
|
||||
return (string)(kv.Key), (string)(json)
|
||||
data, _ := json.Marshal(kv.Value.AsFloat64Slice())
|
||||
return (string)(kv.Key), (string)(data)
|
||||
case attribute.STRINGSLICE:
|
||||
json, _ := json.Marshal(kv.Value.AsStringSlice())
|
||||
return (string)(kv.Key), (string)(json)
|
||||
data, _ := json.Marshal(kv.Value.AsStringSlice())
|
||||
return (string)(kv.Key), (string)(data)
|
||||
default:
|
||||
return (string)(kv.Key), kv.Value.Emit()
|
||||
}
|
||||
@ -245,7 +244,7 @@ var remoteEndpointKeyRank = map[attribute.Key]int{
|
||||
semconv.DBNameKey: 6,
|
||||
}
|
||||
|
||||
func toZipkinRemoteEndpoint(data sdktrace.ReadOnlySpan) *zkmodel.Endpoint {
|
||||
func toZipkinRemoteEndpoint(data tracesdk.ReadOnlySpan) *zkmodel.Endpoint {
|
||||
// Should be set only for client or producer kind
|
||||
if sk := data.SpanKind(); sk != trace.SpanKindClient && sk != trace.SpanKindProducer {
|
||||
return nil
|
||||
|
@ -37,7 +37,7 @@ import (
|
||||
)
|
||||
|
||||
func TestModelConversion(t *testing.T) {
|
||||
resource := resource.NewSchemaless(
|
||||
res := resource.NewSchemaless(
|
||||
semconv.ServiceNameKey.String("model-test"),
|
||||
semconv.ServiceVersionKey.String("0.1.0"),
|
||||
attribute.Int64("resource-attr1", 42),
|
||||
@ -82,7 +82,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// span data with no parent (same as typical, but has
|
||||
// invalid parent)
|
||||
@ -117,7 +117,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// span data of unspecified kind
|
||||
{
|
||||
@ -155,7 +155,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// span data of internal kind
|
||||
{
|
||||
@ -193,7 +193,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// span data of client kind
|
||||
{
|
||||
@ -234,7 +234,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// span data of producer kind
|
||||
{
|
||||
@ -272,7 +272,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// span data of consumer kind
|
||||
{
|
||||
@ -310,7 +310,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// span data with no events
|
||||
{
|
||||
@ -335,7 +335,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// span data with an "error" attribute set to "false"
|
||||
{
|
||||
@ -368,7 +368,7 @@ func TestModelConversion(t *testing.T) {
|
||||
Attributes: nil,
|
||||
},
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
}.Snapshots()
|
||||
|
||||
|
@ -198,7 +198,7 @@ func logStoreLogger(s *logStore) *log.Logger {
|
||||
}
|
||||
|
||||
func TestExportSpans(t *testing.T) {
|
||||
resource := resource.NewSchemaless(
|
||||
res := resource.NewSchemaless(
|
||||
semconv.ServiceNameKey.String("exporter-test"),
|
||||
semconv.ServiceVersionKey.String("0.1.0"),
|
||||
)
|
||||
@ -220,7 +220,7 @@ func TestExportSpans(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "404, file not found",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
// child
|
||||
{
|
||||
@ -242,7 +242,7 @@ func TestExportSpans(t *testing.T) {
|
||||
Code: codes.Error,
|
||||
Description: "403, forbidden",
|
||||
},
|
||||
Resource: resource,
|
||||
Resource: res,
|
||||
},
|
||||
}.Snapshots()
|
||||
models := []zkmodel.SpanModel{
|
||||
|
@ -56,7 +56,6 @@ func defaultErrorHandler() *delegator {
|
||||
lock: &sync.RWMutex{},
|
||||
eh: &errLogger{l: log.New(os.Stderr, "", log.LstdFlags)},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// errLogger logs errors if no delegate is set, otherwise they are delegated.
|
||||
|
@ -125,7 +125,7 @@ func TestHandlerTestSuite(t *testing.T) {
|
||||
|
||||
func TestHandlerRace(t *testing.T) {
|
||||
go SetErrorHandler(&errLogger{log.New(os.Stderr, "", 0)})
|
||||
go Handle(errors.New("Error"))
|
||||
go Handle(errors.New("error"))
|
||||
}
|
||||
|
||||
func BenchmarkErrorHandler(b *testing.B) {
|
||||
@ -135,7 +135,7 @@ func BenchmarkErrorHandler(b *testing.B) {
|
||||
|
||||
globalErrorHandler.setDelegate(primary)
|
||||
|
||||
err := errors.New("BenchmarkErrorHandler")
|
||||
err := errors.New("benchmark error handler")
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
@ -184,7 +184,7 @@ func BenchmarkDefaultErrorHandlerHandle(b *testing.B) {
|
||||
)
|
||||
|
||||
eh := GetErrorHandler()
|
||||
err := errors.New("BenchmarkDefaultErrorHandlerHandle")
|
||||
err := errors.New("benchmark default error handler handle")
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
@ -198,7 +198,7 @@ func BenchmarkDefaultErrorHandlerHandle(b *testing.B) {
|
||||
func BenchmarkDelegatedErrorHandlerHandle(b *testing.B) {
|
||||
eh := GetErrorHandler()
|
||||
SetErrorHandler(&errLogger{l: log.New(ioutil.Discard, "", 0)})
|
||||
err := errors.New("BenchmarkDelegatedErrorHandlerHandle")
|
||||
err := errors.New("benchmark delegated error handler handle")
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
@ -39,8 +39,7 @@ type baggageState struct {
|
||||
// Passing nil SetHookFunc creates a context with no set hook to call.
|
||||
func ContextWithSetHook(parent context.Context, hook SetHookFunc) context.Context {
|
||||
var s baggageState
|
||||
switch v := parent.Value(baggageKey).(type) {
|
||||
case baggageState:
|
||||
if v, ok := parent.Value(baggageKey).(baggageState); ok {
|
||||
s = v
|
||||
}
|
||||
|
||||
@ -54,8 +53,7 @@ func ContextWithSetHook(parent context.Context, hook SetHookFunc) context.Contex
|
||||
// Passing nil GetHookFunc creates a context with no get hook to call.
|
||||
func ContextWithGetHook(parent context.Context, hook GetHookFunc) context.Context {
|
||||
var s baggageState
|
||||
switch v := parent.Value(baggageKey).(type) {
|
||||
case baggageState:
|
||||
if v, ok := parent.Value(baggageKey).(baggageState); ok {
|
||||
s = v
|
||||
}
|
||||
|
||||
@ -67,8 +65,7 @@ func ContextWithGetHook(parent context.Context, hook GetHookFunc) context.Contex
|
||||
// returns a context without any baggage.
|
||||
func ContextWithList(parent context.Context, list List) context.Context {
|
||||
var s baggageState
|
||||
switch v := parent.Value(baggageKey).(type) {
|
||||
case baggageState:
|
||||
if v, ok := parent.Value(baggageKey).(baggageState); ok {
|
||||
s = v
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ func (e *Expectation) NotToBeNil() {
|
||||
func (e *Expectation) ToBeTrue() {
|
||||
switch a := e.actual.(type) {
|
||||
case bool:
|
||||
if e.actual == false {
|
||||
if !a {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be true", e.actual))
|
||||
}
|
||||
default:
|
||||
@ -75,7 +75,7 @@ func (e *Expectation) ToBeTrue() {
|
||||
func (e *Expectation) ToBeFalse() {
|
||||
switch a := e.actual.(type) {
|
||||
case bool:
|
||||
if e.actual == true {
|
||||
if a {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be false", e.actual))
|
||||
}
|
||||
default:
|
||||
@ -253,32 +253,33 @@ func (e *Expectation) ToMatchInAnyOrder(expected interface{}) {
|
||||
|
||||
func (e *Expectation) ToBeTemporally(matcher TemporalMatcher, compareTo interface{}) {
|
||||
if actual, ok := e.actual.(time.Time); ok {
|
||||
if ct, ok := compareTo.(time.Time); ok {
|
||||
switch matcher {
|
||||
case Before:
|
||||
if !actual.Before(ct) {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be temporally before\n\t%v", e.actual, compareTo))
|
||||
}
|
||||
case BeforeOrSameTime:
|
||||
if actual.After(ct) {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be temporally before or at the same time as\n\t%v", e.actual, compareTo))
|
||||
}
|
||||
case After:
|
||||
if !actual.After(ct) {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be temporally after\n\t%v", e.actual, compareTo))
|
||||
}
|
||||
case AfterOrSameTime:
|
||||
if actual.Before(ct) {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be temporally after or at the same time as\n\t%v", e.actual, compareTo))
|
||||
}
|
||||
default:
|
||||
e.fail("Cannot compare times with unexpected temporal matcher")
|
||||
}
|
||||
} else {
|
||||
ct, ok := compareTo.(time.Time)
|
||||
if !ok {
|
||||
e.fail(fmt.Sprintf("Cannot compare to non-temporal value\n\t%v", compareTo))
|
||||
return
|
||||
}
|
||||
|
||||
switch matcher {
|
||||
case Before:
|
||||
if !actual.Before(ct) {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be temporally before\n\t%v", e.actual, compareTo))
|
||||
}
|
||||
case BeforeOrSameTime:
|
||||
if actual.After(ct) {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be temporally before or at the same time as\n\t%v", e.actual, compareTo))
|
||||
}
|
||||
case After:
|
||||
if !actual.After(ct) {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be temporally after\n\t%v", e.actual, compareTo))
|
||||
}
|
||||
case AfterOrSameTime:
|
||||
if actual.Before(ct) {
|
||||
e.fail(fmt.Sprintf("Expected\n\t%v\nto be temporally after or at the same time as\n\t%v", e.actual, compareTo))
|
||||
}
|
||||
default:
|
||||
e.fail("Cannot compare times with unexpected temporal matcher")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
func BoolToRaw(b bool) uint64 {
|
||||
func BoolToRaw(b bool) uint64 { // nolint:revive // b is not a control flag.
|
||||
if b {
|
||||
return 1
|
||||
}
|
||||
|
@ -44,7 +44,6 @@ func ExampleMeter_synchronous() {
|
||||
// Do work
|
||||
// ...
|
||||
workDuration.Record(ctx, time.Since(startTime).Milliseconds())
|
||||
|
||||
}
|
||||
|
||||
//nolint:govet // Meter doesn't register for go vet
|
||||
@ -111,6 +110,4 @@ func ExampleMeter_asynchronous_multiple() {
|
||||
}
|
||||
|
||||
//This is just an example, see the the contrib runtime instrumentation for real implementation.
|
||||
func computeGCPauses(ctx context.Context, recorder syncfloat64.Histogram, pauseBuff []uint64) {
|
||||
|
||||
}
|
||||
func computeGCPauses(ctx context.Context, recorder syncfloat64.Histogram, pauseBuff []uint64) {}
|
||||
|
@ -61,9 +61,9 @@ func WithDescription(desc string) Option {
|
||||
}
|
||||
|
||||
// WithUnit applies provided unit.
|
||||
func WithUnit(unit unit.Unit) Option {
|
||||
func WithUnit(u unit.Unit) Option {
|
||||
return optionFunc(func(cfg Config) Config {
|
||||
cfg.unit = unit
|
||||
cfg.unit = u
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
@ -38,14 +38,12 @@ type afCounter struct {
|
||||
}
|
||||
|
||||
func (i *afCounter) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.AsyncFloat64().Counter(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *afCounter) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
|
||||
@ -71,14 +69,12 @@ type afUpDownCounter struct {
|
||||
}
|
||||
|
||||
func (i *afUpDownCounter) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.AsyncFloat64().UpDownCounter(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *afUpDownCounter) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
|
||||
@ -104,14 +100,12 @@ type afGauge struct {
|
||||
}
|
||||
|
||||
func (i *afGauge) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.AsyncFloat64().Gauge(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *afGauge) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
|
||||
@ -137,14 +131,12 @@ type aiCounter struct {
|
||||
}
|
||||
|
||||
func (i *aiCounter) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.AsyncInt64().Counter(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *aiCounter) Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
|
||||
@ -170,14 +162,12 @@ type aiUpDownCounter struct {
|
||||
}
|
||||
|
||||
func (i *aiUpDownCounter) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.AsyncInt64().UpDownCounter(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *aiUpDownCounter) Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
|
||||
@ -203,14 +193,12 @@ type aiGauge struct {
|
||||
}
|
||||
|
||||
func (i *aiGauge) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.AsyncInt64().Gauge(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *aiGauge) Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
|
||||
@ -237,14 +225,12 @@ type sfCounter struct {
|
||||
}
|
||||
|
||||
func (i *sfCounter) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.SyncFloat64().Counter(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *sfCounter) Add(ctx context.Context, incr float64, attrs ...attribute.KeyValue) {
|
||||
@ -263,14 +249,12 @@ type sfUpDownCounter struct {
|
||||
}
|
||||
|
||||
func (i *sfUpDownCounter) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.SyncFloat64().UpDownCounter(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *sfUpDownCounter) Add(ctx context.Context, incr float64, attrs ...attribute.KeyValue) {
|
||||
@ -295,7 +279,6 @@ func (i *sfHistogram) setDelegate(m metric.Meter) {
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *sfHistogram) Record(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
|
||||
@ -314,14 +297,12 @@ type siCounter struct {
|
||||
}
|
||||
|
||||
func (i *siCounter) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.SyncInt64().Counter(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *siCounter) Add(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
|
||||
@ -340,14 +321,12 @@ type siUpDownCounter struct {
|
||||
}
|
||||
|
||||
func (i *siUpDownCounter) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.SyncInt64().UpDownCounter(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *siUpDownCounter) Add(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
|
||||
@ -366,14 +345,12 @@ type siHistogram struct {
|
||||
}
|
||||
|
||||
func (i *siHistogram) setDelegate(m metric.Meter) {
|
||||
|
||||
ctr, err := m.SyncInt64().Histogram(i.name, i.opts...)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return
|
||||
}
|
||||
i.delegate.Store(ctr)
|
||||
|
||||
}
|
||||
|
||||
func (i *siHistogram) Record(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
|
||||
|
@ -45,7 +45,6 @@ func TestMeterProviderRace(t *testing.T) {
|
||||
|
||||
mp.setDelegate(metric.NewNoopMeterProvider())
|
||||
close(finish)
|
||||
|
||||
}
|
||||
|
||||
func TestMeterRace(t *testing.T) {
|
||||
@ -88,7 +87,6 @@ func TestMeterRace(t *testing.T) {
|
||||
}
|
||||
|
||||
func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) {
|
||||
|
||||
afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter")
|
||||
require.NoError(t, err)
|
||||
_, err = m.AsyncFloat64().UpDownCounter("test_Async_UpDownCounter")
|
||||
@ -142,7 +140,6 @@ func testCollect(t *testing.T, m metric.Meter) {
|
||||
}
|
||||
|
||||
func TestMeterProviderDelegatesCalls(t *testing.T) {
|
||||
|
||||
// The global MeterProvider should directly call the underlying MeterProvider
|
||||
// if it is set prior to Meter() being called.
|
||||
|
||||
@ -184,7 +181,6 @@ func TestMeterProviderDelegatesCalls(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMeterDelegatesCalls(t *testing.T) {
|
||||
|
||||
// The global MeterProvider should directly provide a Meter instance that
|
||||
// can be updated. If the SetMeterProvider is called after a Meter was
|
||||
// obtained, but before instruments only the instrument should be generated
|
||||
@ -227,7 +223,6 @@ func TestMeterDelegatesCalls(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMeterDefersDelegations(t *testing.T) {
|
||||
|
||||
// If SetMeterProvider is called after instruments are registered, the
|
||||
// instruments should be recreated with the new meter.
|
||||
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
package unit // import "go.opentelemetry.io/otel/metric/unit"
|
||||
|
||||
// Unit is a determinate standard quantity of measurement.
|
||||
type Unit string
|
||||
|
||||
// Units defined by OpenTelemetry.
|
||||
|
1
sdk/internal/env/env_test.go
vendored
1
sdk/internal/env/env_test.go
vendored
@ -114,7 +114,6 @@ func TestEnvParse(t *testing.T) {
|
||||
require.NoError(t, os.Setenv(key, invalid))
|
||||
assert.Equal(t, defVal, tc.f(defVal), "invalid value")
|
||||
})
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ type Aggregator interface {
|
||||
//
|
||||
// The Context argument comes from user-level code and could be
|
||||
// inspected for a `correlation.Map` or `trace.SpanContext`.
|
||||
Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error
|
||||
Update(ctx context.Context, n number.Number, descriptor *sdkapi.Descriptor) error
|
||||
|
||||
// SynchronizedMove is called during collection to finish one
|
||||
// period of aggregation by atomically saving the
|
||||
|
@ -32,14 +32,19 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/sdkapi"
|
||||
)
|
||||
|
||||
// Magnitude is the upper-bound of random numbers used in profile tests.
|
||||
const Magnitude = 1000
|
||||
|
||||
// Profile is an aggregator test profile.
|
||||
type Profile struct {
|
||||
NumberKind number.Kind
|
||||
Random func(sign int) number.Number
|
||||
}
|
||||
|
||||
// NoopAggregator is an aggregator that performs no operations.
|
||||
type NoopAggregator struct{}
|
||||
|
||||
// NoopAggregation is an aggregation that performs no operations.
|
||||
type NoopAggregation struct{}
|
||||
|
||||
var _ aggregator.Aggregator = NoopAggregator{}
|
||||
@ -63,11 +68,13 @@ func newProfiles() []Profile {
|
||||
}
|
||||
}
|
||||
|
||||
// NewAggregatorTest returns a descriptor for mkind and nkind.
|
||||
func NewAggregatorTest(mkind sdkapi.InstrumentKind, nkind number.Kind) *sdkapi.Descriptor {
|
||||
desc := sdkapi.NewDescriptor("test.name", mkind, nkind, "", "")
|
||||
return &desc
|
||||
}
|
||||
|
||||
// RunProfiles runs all test profile against the factory function f.
|
||||
func RunProfiles(t *testing.T, f func(*testing.T, Profile)) {
|
||||
for _, profile := range newProfiles() {
|
||||
t.Run(profile.NumberKind.String(), func(t *testing.T) {
|
||||
@ -85,44 +92,54 @@ func TestMain(m *testing.M) {
|
||||
},
|
||||
}
|
||||
if !ottest.Aligned8Byte(fields, os.Stderr) {
|
||||
// nolint:revive // this is a main func, allow Exit.
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// nolint:revive // this is a main func, allow Exit.
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
// Numbers are a collection of measured data point values.
|
||||
type Numbers struct {
|
||||
// numbers has to be aligned for 64-bit atomic operations.
|
||||
numbers []number.Number
|
||||
kind number.Kind
|
||||
}
|
||||
|
||||
// NewNumbers returns a new Numbers for the passed kind.
|
||||
func NewNumbers(kind number.Kind) Numbers {
|
||||
return Numbers{
|
||||
kind: kind,
|
||||
}
|
||||
}
|
||||
|
||||
// Append appends v to the numbers n.
|
||||
func (n *Numbers) Append(v number.Number) {
|
||||
n.numbers = append(n.numbers, v)
|
||||
}
|
||||
|
||||
// Sort sorts all the numbers contained in n.
|
||||
func (n *Numbers) Sort() {
|
||||
sort.Sort(n)
|
||||
}
|
||||
|
||||
// Less returns if the number at index i is less than the number at index j.
|
||||
func (n *Numbers) Less(i, j int) bool {
|
||||
return n.numbers[i].CompareNumber(n.kind, n.numbers[j]) < 0
|
||||
}
|
||||
|
||||
// Len returns number of data points Numbers contains.
|
||||
func (n *Numbers) Len() int {
|
||||
return len(n.numbers)
|
||||
}
|
||||
|
||||
// Swap swaps the location of the numbers at index i and j.
|
||||
func (n *Numbers) Swap(i, j int) {
|
||||
n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i]
|
||||
}
|
||||
|
||||
// Sum returns the sum of all data points.
|
||||
func (n *Numbers) Sum() number.Number {
|
||||
var sum number.Number
|
||||
for _, num := range n.numbers {
|
||||
@ -131,65 +148,78 @@ func (n *Numbers) Sum() number.Number {
|
||||
return sum
|
||||
}
|
||||
|
||||
// Count returns the number of data points Numbers contains.
|
||||
func (n *Numbers) Count() uint64 {
|
||||
return uint64(len(n.numbers))
|
||||
}
|
||||
|
||||
// Min returns the min number.
|
||||
func (n *Numbers) Min() number.Number {
|
||||
return n.numbers[0]
|
||||
}
|
||||
|
||||
// Max returns the max number.
|
||||
func (n *Numbers) Max() number.Number {
|
||||
return n.numbers[len(n.numbers)-1]
|
||||
}
|
||||
|
||||
// Points returns the slice of number for all data points.
|
||||
func (n *Numbers) Points() []number.Number {
|
||||
return n.numbers
|
||||
}
|
||||
|
||||
// CheckedUpdate performs the same range test the SDK does on behalf of the aggregator.
|
||||
func CheckedUpdate(t *testing.T, agg aggregator.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) {
|
||||
func CheckedUpdate(t *testing.T, agg aggregator.Aggregator, n number.Number, descriptor *sdkapi.Descriptor) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Note: Aggregator tests are written assuming that the SDK
|
||||
// has performed the RangeTest. Therefore we skip errors that
|
||||
// would have been detected by the RangeTest.
|
||||
err := aggregator.RangeTest(number, descriptor)
|
||||
err := aggregator.RangeTest(n, descriptor)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := agg.Update(ctx, number, descriptor); err != nil {
|
||||
if err := agg.Update(ctx, n, descriptor); err != nil {
|
||||
t.Error("Unexpected Update failure", err)
|
||||
}
|
||||
}
|
||||
|
||||
// CheckedMerge verifies aggFrom merges into aggInto with the scope of
|
||||
// descriptor.
|
||||
func CheckedMerge(t *testing.T, aggInto, aggFrom aggregator.Aggregator, descriptor *sdkapi.Descriptor) {
|
||||
if err := aggInto.Merge(aggFrom, descriptor); err != nil {
|
||||
t.Error("Unexpected Merge failure", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Kind returns a Noop aggregation Kind.
|
||||
func (NoopAggregation) Kind() aggregation.Kind {
|
||||
return aggregation.Kind("Noop")
|
||||
}
|
||||
|
||||
// Aggregation returns a NoopAggregation.
|
||||
func (NoopAggregator) Aggregation() aggregation.Aggregation {
|
||||
return NoopAggregation{}
|
||||
}
|
||||
|
||||
// Update performs no operation.
|
||||
func (NoopAggregator) Update(context.Context, number.Number, *sdkapi.Descriptor) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SynchronizedMove performs no operation.
|
||||
func (NoopAggregator) SynchronizedMove(aggregator.Aggregator, *sdkapi.Descriptor) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Merge performs no operation.
|
||||
func (NoopAggregator) Merge(aggregator.Aggregator, *sdkapi.Descriptor) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SynchronizedMoveResetTest tests SynchronizedMove behavior for an aggregator
|
||||
// during resets.
|
||||
func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) aggregator.Aggregator) {
|
||||
t.Run("reset on nil", func(t *testing.T) {
|
||||
// Ensures that SynchronizedMove(nil, descriptor) discards and
|
||||
@ -272,8 +302,6 @@ func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf fun
|
||||
require.Equal(t, input, v)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
@ -219,9 +219,9 @@ func (c *Aggregator) clearState() {
|
||||
}
|
||||
|
||||
// Update adds the recorded measurement to the current data set.
|
||||
func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkapi.Descriptor) error {
|
||||
func (c *Aggregator) Update(_ context.Context, n number.Number, desc *sdkapi.Descriptor) error {
|
||||
kind := desc.NumberKind()
|
||||
asFloat := number.CoerceToFloat64(kind)
|
||||
asFloat := n.CoerceToFloat64(kind)
|
||||
|
||||
bucketID := len(c.boundaries)
|
||||
for i, boundary := range c.boundaries {
|
||||
@ -246,7 +246,7 @@ func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap
|
||||
defer c.lock.Unlock()
|
||||
|
||||
c.state.count++
|
||||
c.state.sum.AddNumber(kind, number)
|
||||
c.state.sum.AddNumber(kind, n)
|
||||
c.state.bucketCounts[bucketID]++
|
||||
|
||||
return nil
|
||||
|
@ -193,7 +193,6 @@ func TestHistogramNotSet(t *testing.T) {
|
||||
// checkHistogram ensures the correct aggregated state between `all`
|
||||
// (test aggregator) and `agg` (code under test).
|
||||
func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregatortest.Profile, agg *histogram.Aggregator) {
|
||||
|
||||
all.Sort()
|
||||
|
||||
asum, err := agg.Sum()
|
||||
|
@ -104,9 +104,9 @@ func (g *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descri
|
||||
}
|
||||
|
||||
// Update atomically sets the current "last" value.
|
||||
func (g *Aggregator) Update(_ context.Context, number number.Number, desc *sdkapi.Descriptor) error {
|
||||
func (g *Aggregator) Update(_ context.Context, n number.Number, desc *sdkapi.Descriptor) error {
|
||||
ngd := &lastValueData{
|
||||
value: number,
|
||||
value: n,
|
||||
timestamp: time.Now(),
|
||||
}
|
||||
atomic.StorePointer(&g.value, unsafe.Pointer(ngd))
|
||||
|
@ -81,6 +81,8 @@ type Controller struct {
|
||||
var _ export.InstrumentationLibraryReader = &Controller{}
|
||||
var _ metric.MeterProvider = &Controller{}
|
||||
|
||||
// Meter returns a new Meter defined by instrumentationName and configured
|
||||
// with opts.
|
||||
func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOption) metric.Meter {
|
||||
cfg := metric.NewMeterConfig(opts...)
|
||||
library := instrumentation.Library{
|
||||
@ -310,7 +312,7 @@ func (c *Controller) checkpointSingleAccumulator(ctx context.Context, ac *accumu
|
||||
|
||||
// export calls the exporter with a read lock on the Reader,
|
||||
// applying the configured export timeout.
|
||||
func (c *Controller) export(ctx context.Context) error {
|
||||
func (c *Controller) export(ctx context.Context) error { // nolint:revive // method name shadows import.
|
||||
if c.pushTimeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, c.pushTimeout)
|
||||
|
@ -118,5 +118,4 @@ func TestPullWithCollect(t *testing.T) {
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter.sum/A=B/": 20,
|
||||
}, records.Map())
|
||||
|
||||
}
|
||||
|
@ -63,5 +63,4 @@ func TestEndToEnd(t *testing.T) {
|
||||
|
||||
h.lock.Lock()
|
||||
require.Len(t, h.errors, 0)
|
||||
|
||||
}
|
||||
|
@ -25,10 +25,12 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
|
||||
)
|
||||
|
||||
// MockClock is a Clock used for testing.
|
||||
type MockClock struct {
|
||||
mock *clock.Mock
|
||||
}
|
||||
|
||||
// MockTicker is a Ticker used for testing.
|
||||
type MockTicker struct {
|
||||
ticker *clock.Ticker
|
||||
}
|
||||
@ -36,26 +38,33 @@ type MockTicker struct {
|
||||
var _ controllerTime.Clock = MockClock{}
|
||||
var _ controllerTime.Ticker = MockTicker{}
|
||||
|
||||
// NewMockClock returns a new unset MockClock.
|
||||
func NewMockClock() MockClock {
|
||||
return MockClock{clock.NewMock()}
|
||||
}
|
||||
|
||||
// Now returns the current time.
|
||||
func (c MockClock) Now() time.Time {
|
||||
return c.mock.Now()
|
||||
}
|
||||
|
||||
// Ticker creates a new instance of a Ticker.
|
||||
func (c MockClock) Ticker(period time.Duration) controllerTime.Ticker {
|
||||
return MockTicker{c.mock.Ticker(period)}
|
||||
}
|
||||
|
||||
// Add moves the current time of the MockClock forward by the specified
|
||||
// duration.
|
||||
func (c MockClock) Add(d time.Duration) {
|
||||
c.mock.Add(d)
|
||||
}
|
||||
|
||||
// Stop turns off the MockTicker.
|
||||
func (t MockTicker) Stop() {
|
||||
t.ticker.Stop()
|
||||
}
|
||||
|
||||
// C returns a channel that receives the current time when MockTicker ticks.
|
||||
func (t MockTicker) C() <-chan time.Time {
|
||||
return t.ticker.C
|
||||
}
|
||||
|
@ -16,44 +16,52 @@ package time // import "go.opentelemetry.io/otel/sdk/metric/controller/time"
|
||||
|
||||
import (
|
||||
"time"
|
||||
lib "time"
|
||||
)
|
||||
|
||||
// Several types below are created to match "github.com/benbjohnson/clock"
|
||||
// so that it remains a test-only dependency.
|
||||
|
||||
// Clock keeps track of time for a metric SDK.
|
||||
type Clock interface {
|
||||
Now() lib.Time
|
||||
Ticker(duration lib.Duration) Ticker
|
||||
Now() time.Time
|
||||
Ticker(duration time.Duration) Ticker
|
||||
}
|
||||
|
||||
// Ticker signals time intervals.
|
||||
type Ticker interface {
|
||||
Stop()
|
||||
C() <-chan lib.Time
|
||||
C() <-chan time.Time
|
||||
}
|
||||
|
||||
// RealClock wraps the time package and uses the system time to tell time.
|
||||
type RealClock struct {
|
||||
}
|
||||
|
||||
// RealTicker wraps the time package and uses system time to tick time
|
||||
// intervals.
|
||||
type RealTicker struct {
|
||||
ticker *lib.Ticker
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
var _ Clock = RealClock{}
|
||||
var _ Ticker = RealTicker{}
|
||||
|
||||
// Now returns the current time.
|
||||
func (RealClock) Now() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
// Ticker creates a new RealTicker that will tick with period.
|
||||
func (RealClock) Ticker(period time.Duration) Ticker {
|
||||
return RealTicker{time.NewTicker(period)}
|
||||
}
|
||||
|
||||
// Stop turns off the RealTicker.
|
||||
func (t RealTicker) Stop() {
|
||||
t.ticker.Stop()
|
||||
}
|
||||
|
||||
// C returns a channel that receives the current time when RealTicker ticks.
|
||||
func (t RealTicker) C() <-chan time.Time {
|
||||
return t.ticker.C
|
||||
}
|
||||
|
@ -209,7 +209,6 @@ func TestSDKAttrsDeduplication(t *testing.T) {
|
||||
|
||||
allExpect := map[string]float64{}
|
||||
for numKeys := 0; numKeys < maxKeys; numKeys++ {
|
||||
|
||||
var kvsA []attribute.KeyValue
|
||||
var kvsB []attribute.KeyValue
|
||||
for r := 0; r < repeats; r++ {
|
||||
@ -240,7 +239,6 @@ func TestSDKAttrsDeduplication(t *testing.T) {
|
||||
counter.Add(ctx, 1, kvsB...)
|
||||
allExpect[format(expectB)] += 2
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
sdk.Collect(ctx)
|
||||
|
@ -100,7 +100,7 @@ const (
|
||||
// Sentinel errors for Aggregation interface.
|
||||
var (
|
||||
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
|
||||
ErrNaNInput = fmt.Errorf("NaN value is an invalid input")
|
||||
ErrNaNInput = fmt.Errorf("invalid input value: NaN")
|
||||
ErrInconsistentType = fmt.Errorf("inconsistent aggregator types")
|
||||
|
||||
// ErrNoCumulativeToDelta is returned when requesting delta
|
||||
|
@ -93,7 +93,7 @@ type AggregatorSelector interface {
|
||||
// Note: This is context-free because the aggregator should
|
||||
// not relate to the incoming context. This call should not
|
||||
// block.
|
||||
AggregatorFor(descriptor *sdkapi.Descriptor, aggregator ...*aggregator.Aggregator)
|
||||
AggregatorFor(descriptor *sdkapi.Descriptor, agg ...*aggregator.Aggregator)
|
||||
}
|
||||
|
||||
// Checkpointer is the interface used by a Controller to coordinate
|
||||
@ -141,7 +141,7 @@ type Exporter interface {
|
||||
//
|
||||
// The InstrumentationLibraryReader interface refers to the
|
||||
// Processor that just completed collection.
|
||||
Export(ctx context.Context, resource *resource.Resource, reader InstrumentationLibraryReader) error
|
||||
Export(ctx context.Context, res *resource.Resource, reader InstrumentationLibraryReader) error
|
||||
|
||||
// TemporalitySelector is an interface used by the Processor
|
||||
// in deciding whether to compute Delta or Cumulative
|
||||
@ -232,13 +232,13 @@ func (m Metadata) Attributes() *attribute.Set {
|
||||
// Accumulations to send to Processors. The Descriptor, attributes, and
|
||||
// Aggregator represent aggregate metric events received over a single
|
||||
// collection period.
|
||||
func NewAccumulation(descriptor *sdkapi.Descriptor, attrs *attribute.Set, aggregator aggregator.Aggregator) Accumulation {
|
||||
func NewAccumulation(descriptor *sdkapi.Descriptor, attrs *attribute.Set, agg aggregator.Aggregator) Accumulation {
|
||||
return Accumulation{
|
||||
Metadata: Metadata{
|
||||
descriptor: descriptor,
|
||||
attrs: attrs,
|
||||
},
|
||||
aggregator: aggregator,
|
||||
aggregator: agg,
|
||||
}
|
||||
}
|
||||
|
||||
@ -251,13 +251,13 @@ func (r Accumulation) Aggregator() aggregator.Aggregator {
|
||||
// NewRecord allows Processor implementations to construct export records.
|
||||
// The Descriptor, attributes, and Aggregator represent aggregate metric
|
||||
// events received over a single collection period.
|
||||
func NewRecord(descriptor *sdkapi.Descriptor, attrs *attribute.Set, aggregation aggregation.Aggregation, start, end time.Time) Record {
|
||||
func NewRecord(descriptor *sdkapi.Descriptor, attrs *attribute.Set, agg aggregation.Aggregation, start, end time.Time) Record {
|
||||
return Record{
|
||||
Metadata: Metadata{
|
||||
descriptor: descriptor,
|
||||
attrs: attrs,
|
||||
},
|
||||
aggregation: aggregation,
|
||||
aggregation: agg,
|
||||
start: start,
|
||||
end: end,
|
||||
}
|
||||
|
@ -61,7 +61,6 @@ func TestSyncInstruments(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.InDelta(t, 3.0, out.Sum.AsFloat64(), 0.0001)
|
||||
assert.Equal(t, aggregation.SumKind, out.AggregationKind)
|
||||
|
||||
})
|
||||
|
||||
t.Run("Float Histogram", func(t *testing.T) {
|
||||
@ -94,7 +93,6 @@ func TestSyncInstruments(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 22, out.Sum.AsInt64())
|
||||
assert.Equal(t, aggregation.SumKind, out.AggregationKind)
|
||||
|
||||
})
|
||||
t.Run("Int UpDownCounter", func(t *testing.T) {
|
||||
iudcnt, err := meter.SyncInt64().UpDownCounter("iUDCount")
|
||||
@ -109,10 +107,8 @@ func TestSyncInstruments(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 23, out.Sum.AsInt64())
|
||||
assert.Equal(t, aggregation.SumKind, out.AggregationKind)
|
||||
|
||||
})
|
||||
t.Run("Int Histogram", func(t *testing.T) {
|
||||
|
||||
ihis, err := meter.SyncInt64().Histogram("iHist")
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -163,7 +159,6 @@ func TestSyncDeltaInstruments(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.InDelta(t, 3.0, out.Sum.AsFloat64(), 0.0001)
|
||||
assert.Equal(t, aggregation.SumKind, out.AggregationKind)
|
||||
|
||||
})
|
||||
|
||||
t.Run("Float Histogram", func(t *testing.T) {
|
||||
@ -196,7 +191,6 @@ func TestSyncDeltaInstruments(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 22, out.Sum.AsInt64())
|
||||
assert.Equal(t, aggregation.SumKind, out.AggregationKind)
|
||||
|
||||
})
|
||||
t.Run("Int UpDownCounter", func(t *testing.T) {
|
||||
iudcnt, err := meter.SyncInt64().UpDownCounter("iUDCount")
|
||||
@ -211,10 +205,8 @@ func TestSyncDeltaInstruments(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 23, out.Sum.AsInt64())
|
||||
assert.Equal(t, aggregation.SumKind, out.AggregationKind)
|
||||
|
||||
})
|
||||
t.Run("Int Histogram", func(t *testing.T) {
|
||||
|
||||
ihis, err := meter.SyncInt64().Histogram("iHist")
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -349,7 +341,6 @@ func TestAsyncInstruments(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 23, out.Sum.AsInt64())
|
||||
assert.Equal(t, aggregation.SumKind, out.AggregationKind)
|
||||
|
||||
})
|
||||
t.Run("Int Gauge", func(t *testing.T) {
|
||||
meter := mp.Meter("go.opentelemetry.io/otel/sdk/metric/metrictest/exporter_TestAsyncCounter_IntGauge")
|
||||
@ -373,7 +364,6 @@ func TestAsyncInstruments(t *testing.T) {
|
||||
assert.EqualValues(t, 25, out.LastValue.AsInt64())
|
||||
assert.Equal(t, aggregation.LastValueKind, out.AggregationKind)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func ExampleExporter_GetByName() {
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
// Processor is a basic metric processor.
|
||||
Processor struct {
|
||||
aggregation.TemporalitySelector
|
||||
export.AggregatorSelector
|
||||
@ -129,6 +130,7 @@ type factory struct {
|
||||
config config
|
||||
}
|
||||
|
||||
// NewFactory returns a new basic CheckpointerFactory.
|
||||
func NewFactory(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) export.CheckpointerFactory {
|
||||
var config config
|
||||
for _, opt := range opts {
|
||||
@ -156,7 +158,6 @@ func (f factory) NewCheckpointer() export.Checkpointer {
|
||||
},
|
||||
}
|
||||
return p
|
||||
|
||||
}
|
||||
|
||||
// Process implements export.Processor.
|
||||
|
@ -36,7 +36,6 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/number"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
|
||||
processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
|
||||
"go.opentelemetry.io/otel/sdk/metric/sdkapi"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
@ -138,7 +137,7 @@ func testProcessor(
|
||||
|
||||
// Note: this selector uses the instrument name to dictate
|
||||
// aggregation kind.
|
||||
selector := processorTest.AggregatorSelector()
|
||||
selector := processortest.AggregatorSelector()
|
||||
|
||||
labs1 := []attribute.KeyValue{attribute.String("L1", "V")}
|
||||
labs2 := []attribute.KeyValue{attribute.String("L2", "V")}
|
||||
@ -152,7 +151,6 @@ func testProcessor(
|
||||
desc2 := metrictest.NewDescriptor(fmt.Sprint("inst2", instSuffix), mkind, nkind)
|
||||
|
||||
for nc := 0; nc < nCheckpoint; nc++ {
|
||||
|
||||
// The input is 10 per update, scaled by
|
||||
// the number of checkpoints for
|
||||
// cumulative instruments:
|
||||
@ -188,7 +186,7 @@ func testProcessor(
|
||||
}
|
||||
|
||||
// Test the final checkpoint state.
|
||||
records1 := processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
records1 := processortest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, reader.ForEach(aggregation.ConstantTemporalitySelector(aggTemp), records1.AddRecord))
|
||||
|
||||
if !expectConversion {
|
||||
@ -274,19 +272,19 @@ func (bogusExporter) Export(context.Context, export.Reader) error {
|
||||
|
||||
func TestBasicInconsistent(t *testing.T) {
|
||||
// Test double-start
|
||||
b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
b := basic.New(processortest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
|
||||
b.StartCollection()
|
||||
b.StartCollection()
|
||||
require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
|
||||
|
||||
// Test finish without start
|
||||
b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
b = basic.New(processortest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
|
||||
require.Equal(t, basic.ErrInconsistentState, b.FinishCollection())
|
||||
|
||||
// Test no finish
|
||||
b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
b = basic.New(processortest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
|
||||
b.StartCollection()
|
||||
require.Equal(
|
||||
@ -299,14 +297,14 @@ func TestBasicInconsistent(t *testing.T) {
|
||||
)
|
||||
|
||||
// Test no start
|
||||
b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
b = basic.New(processortest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
|
||||
desc := metrictest.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind)
|
||||
accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{})
|
||||
require.Equal(t, basic.ErrInconsistentState, b.Process(accum))
|
||||
|
||||
// Test invalid kind:
|
||||
b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
b = basic.New(processortest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
b.StartCollection()
|
||||
require.NoError(t, b.Process(accum))
|
||||
require.NoError(t, b.FinishCollection())
|
||||
@ -316,13 +314,12 @@ func TestBasicInconsistent(t *testing.T) {
|
||||
func(export.Record) error { return nil },
|
||||
)
|
||||
require.True(t, errors.Is(err, basic.ErrInvalidTemporality))
|
||||
|
||||
}
|
||||
|
||||
func TestBasicTimestamps(t *testing.T) {
|
||||
beforeNew := time.Now()
|
||||
time.Sleep(time.Nanosecond)
|
||||
b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
b := basic.New(processortest.AggregatorSelector(), aggregation.StatelessTemporalitySelector())
|
||||
time.Sleep(time.Nanosecond)
|
||||
afterNew := time.Now()
|
||||
|
||||
@ -372,7 +369,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
|
||||
aggTempSel := aggregation.CumulativeTemporalitySelector()
|
||||
|
||||
desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterInstrumentKind, number.Int64Kind)
|
||||
selector := processorTest.AggregatorSelector()
|
||||
selector := processortest.AggregatorSelector()
|
||||
|
||||
processor := basic.New(selector, aggTempSel, basic.WithMemory(false))
|
||||
reader := processor.Reader()
|
||||
@ -383,7 +380,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
|
||||
require.NoError(t, processor.FinishCollection())
|
||||
|
||||
// Verify zero elements
|
||||
records := processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
records := processortest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
|
||||
require.EqualValues(t, map[string]float64{}, records.Map())
|
||||
|
||||
@ -393,7 +390,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) {
|
||||
require.NoError(t, processor.FinishCollection())
|
||||
|
||||
// Verify one element
|
||||
records = processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
records = processortest.NewOutput(attribute.DefaultEncoder())
|
||||
require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"inst.sum/A=B/": float64(i * 10),
|
||||
@ -413,7 +410,7 @@ func TestMultiObserverSum(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
aggTempSel := test.TemporalitySelector
|
||||
desc := metrictest.NewDescriptor("observe.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind)
|
||||
selector := processorTest.AggregatorSelector()
|
||||
selector := processortest.AggregatorSelector()
|
||||
|
||||
processor := basic.New(selector, aggTempSel, basic.WithMemory(false))
|
||||
reader := processor.Reader()
|
||||
@ -427,7 +424,7 @@ func TestMultiObserverSum(t *testing.T) {
|
||||
require.NoError(t, processor.FinishCollection())
|
||||
|
||||
// Verify one element
|
||||
records := processorTest.NewOutput(attribute.DefaultEncoder())
|
||||
records := processortest.NewOutput(attribute.DefaultEncoder())
|
||||
if test.expectProcessErr == nil {
|
||||
require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord))
|
||||
require.EqualValues(t, map[string]float64{
|
||||
@ -446,7 +443,7 @@ func TestCounterObserverEndToEnd(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
eselector := aggregation.CumulativeTemporalitySelector()
|
||||
proc := basic.New(
|
||||
processorTest.AggregatorSelector(),
|
||||
processortest.AggregatorSelector(),
|
||||
eselector,
|
||||
)
|
||||
accum := sdk.NewAccumulator(proc)
|
||||
|
@ -23,6 +23,7 @@ type config struct {
|
||||
Memory bool
|
||||
}
|
||||
|
||||
// Option configures a basic processor configuration.
|
||||
type Option interface {
|
||||
applyProcessor(config) config
|
||||
}
|
||||
|
@ -95,6 +95,8 @@ type testFactory struct {
|
||||
encoder attribute.Encoder
|
||||
}
|
||||
|
||||
// NewCheckpointerFactory returns a new CheckpointerFactory for the selector
|
||||
// and encoder pair.
|
||||
func NewCheckpointerFactory(selector export.AggregatorSelector, encoder attribute.Encoder) export.CheckpointerFactory {
|
||||
return testFactory{
|
||||
selector: selector,
|
||||
@ -102,6 +104,7 @@ func NewCheckpointerFactory(selector export.AggregatorSelector, encoder attribut
|
||||
}
|
||||
}
|
||||
|
||||
// NewCheckpointer returns a new Checkpointer for Processor p.
|
||||
func NewCheckpointer(p *Processor) export.Checkpointer {
|
||||
return &testCheckpointer{
|
||||
Processor: p,
|
||||
@ -179,7 +182,6 @@ func AggregatorSelector() export.AggregatorSelector {
|
||||
|
||||
// AggregatorFor implements export.AggregatorSelector.
|
||||
func (testAggregatorSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) {
|
||||
|
||||
switch {
|
||||
case strings.HasSuffix(desc.Name(), ".disabled"):
|
||||
for i := range aggPtrs {
|
||||
@ -240,10 +242,12 @@ func (o *Output) AddRecord(rec export.Record) error {
|
||||
return o.AddRecordWithResource(rec, resource.Empty())
|
||||
}
|
||||
|
||||
// AddRecordWithResource merges rec into this Output.
|
||||
func (o *Output) AddInstrumentationLibraryRecord(_ instrumentation.Library, rec export.Record) error {
|
||||
return o.AddRecordWithResource(rec, resource.Empty())
|
||||
}
|
||||
|
||||
// AddRecordWithResource merges rec into this Output scoping it with res.
|
||||
func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource) error {
|
||||
key := mapKey{
|
||||
desc: rec.Descriptor(),
|
||||
@ -331,6 +335,7 @@ func New(selector aggregation.TemporalitySelector, encoder attribute.Encoder) *E
|
||||
}
|
||||
}
|
||||
|
||||
// Export records all the measurements aggregated in ckpt for res.
|
||||
func (e *Exporter) Export(_ context.Context, res *resource.Resource, ckpt export.InstrumentationLibraryReader) error {
|
||||
e.output.Lock()
|
||||
defer e.output.Unlock()
|
||||
@ -374,6 +379,8 @@ func (e *Exporter) Reset() {
|
||||
e.exportCount = 0
|
||||
}
|
||||
|
||||
// OneInstrumentationLibraryReader returns an InstrumentationLibraryReader for
|
||||
// a single instrumentation library.
|
||||
func OneInstrumentationLibraryReader(l instrumentation.Library, r export.Reader) export.InstrumentationLibraryReader {
|
||||
return oneLibraryReader{l, r}
|
||||
}
|
||||
@ -387,6 +394,8 @@ func (o oneLibraryReader) ForEach(readerFunc func(instrumentation.Library, expor
|
||||
return readerFunc(o.library, o.reader)
|
||||
}
|
||||
|
||||
// MultiInstrumentationLibraryReader returns an InstrumentationLibraryReader
|
||||
// for a group of records that came from multiple instrumentation libraries.
|
||||
func MultiInstrumentationLibraryReader(records map[instrumentation.Library][]export.Record) export.InstrumentationLibraryReader {
|
||||
return instrumentationLibraryReader{records: records}
|
||||
}
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/export"
|
||||
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
|
||||
processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
|
||||
"go.opentelemetry.io/otel/sdk/metric/sdkapi"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
@ -58,9 +57,9 @@ func generateTestData(t *testing.T, proc export.Processor) {
|
||||
func TestProcessorTesting(t *testing.T) {
|
||||
// Test the Processor test helper using a real Accumulator to
|
||||
// generate Accumulations.
|
||||
checkpointer := processorTest.NewCheckpointer(
|
||||
processorTest.NewProcessor(
|
||||
processorTest.AggregatorSelector(),
|
||||
checkpointer := processortest.NewCheckpointer(
|
||||
processortest.NewProcessor(
|
||||
processortest.AggregatorSelector(),
|
||||
attribute.DefaultEncoder(),
|
||||
),
|
||||
)
|
||||
@ -75,7 +74,7 @@ func TestProcessorTesting(t *testing.T) {
|
||||
}
|
||||
|
||||
// Export the data and validate it again.
|
||||
exporter := processorTest.New(
|
||||
exporter := processortest.New(
|
||||
aggregation.StatelessTemporalitySelector(),
|
||||
attribute.DefaultEncoder(),
|
||||
)
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
|
||||
processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
|
||||
"go.opentelemetry.io/otel/sdk/metric/processor/reducer"
|
||||
"go.opentelemetry.io/otel/sdk/metric/sdkapi"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
@ -73,12 +72,12 @@ func generateData(t *testing.T, impl sdkapi.MeterImpl) {
|
||||
}
|
||||
|
||||
func TestFilterProcessor(t *testing.T) {
|
||||
testProc := processorTest.NewProcessor(
|
||||
processorTest.AggregatorSelector(),
|
||||
testProc := processortest.NewProcessor(
|
||||
processortest.AggregatorSelector(),
|
||||
attribute.DefaultEncoder(),
|
||||
)
|
||||
accum := metricsdk.NewAccumulator(
|
||||
reducer.New(testFilter{}, processorTest.NewCheckpointer(testProc)),
|
||||
reducer.New(testFilter{}, processortest.NewCheckpointer(testProc)),
|
||||
)
|
||||
generateData(t, accum)
|
||||
|
||||
@ -92,11 +91,11 @@ func TestFilterProcessor(t *testing.T) {
|
||||
|
||||
// Test a filter with the ../basic Processor.
|
||||
func TestFilterBasicProcessor(t *testing.T) {
|
||||
basicProc := basic.New(processorTest.AggregatorSelector(), aggregation.CumulativeTemporalitySelector())
|
||||
basicProc := basic.New(processortest.AggregatorSelector(), aggregation.CumulativeTemporalitySelector())
|
||||
accum := metricsdk.NewAccumulator(
|
||||
reducer.New(testFilter{}, basicProc),
|
||||
)
|
||||
exporter := processorTest.New(basicProc, attribute.DefaultEncoder())
|
||||
exporter := processortest.New(basicProc, attribute.DefaultEncoder())
|
||||
|
||||
generateData(t, accum)
|
||||
|
||||
|
@ -130,6 +130,7 @@ func (u *UniqueInstrumentMeterImpl) NewAsyncInstrument(descriptor sdkapi.Descrip
|
||||
return asyncInst, nil
|
||||
}
|
||||
|
||||
// RegisterCallback registers callback with insts.
|
||||
func (u *UniqueInstrumentMeterImpl) RegisterCallback(insts []instrument.Asynchronous, callback func(context.Context)) error {
|
||||
u.lock.Lock()
|
||||
defer u.lock.Unlock()
|
||||
|
@ -128,6 +128,8 @@ var (
|
||||
// ErrUninitializedInstrument is returned when an instrument is used when uninitialized.
|
||||
ErrUninitializedInstrument = fmt.Errorf("use of an uninitialized instrument")
|
||||
|
||||
// ErrBadInstrument is returned when an instrument from another SDK is
|
||||
// attempted to be registered with this SDK.
|
||||
ErrBadInstrument = fmt.Errorf("use of a instrument from another SDK")
|
||||
)
|
||||
|
||||
@ -146,7 +148,6 @@ func (s *syncInstrument) Implementation() interface{} {
|
||||
// acquireHandle gets or creates a `*record` corresponding to `kvs`,
|
||||
// the input attributes.
|
||||
func (b *baseInstrument) acquireHandle(kvs []attribute.KeyValue) *record {
|
||||
|
||||
// This memory allocation may not be used, but it's
|
||||
// needed for the `sortSlice` field, to avoid an
|
||||
// allocation while sorting.
|
||||
@ -263,6 +264,7 @@ func (m *Accumulator) NewAsyncInstrument(descriptor sdkapi.Descriptor) (sdkapi.A
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// RegisterCallback registers f to be called for insts.
|
||||
func (m *Accumulator) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error {
|
||||
cb := &callback{
|
||||
insts: map[*asyncInstrument]struct{}{},
|
||||
@ -418,5 +420,4 @@ func (m *Accumulator) fromAsync(async sdkapi.AsyncImpl) (*asyncInstrument, error
|
||||
return nil, ErrBadInstrument
|
||||
}
|
||||
return inst, nil
|
||||
|
||||
}
|
||||
|
@ -31,13 +31,13 @@ type Descriptor struct {
|
||||
}
|
||||
|
||||
// NewDescriptor returns a Descriptor with the given contents.
|
||||
func NewDescriptor(name string, ikind InstrumentKind, nkind number.Kind, description string, unit unit.Unit) Descriptor {
|
||||
func NewDescriptor(name string, ikind InstrumentKind, nkind number.Kind, description string, u unit.Unit) Descriptor {
|
||||
return Descriptor{
|
||||
name: name,
|
||||
instrumentKind: ikind,
|
||||
numberKind: nkind,
|
||||
description: description,
|
||||
unit: unit,
|
||||
unit: u,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ type SyncImpl interface {
|
||||
instrument.Synchronous
|
||||
|
||||
// RecordOne captures a single synchronous metric event.
|
||||
RecordOne(ctx context.Context, number number.Number, attrs []attribute.KeyValue)
|
||||
RecordOne(ctx context.Context, n number.Number, attrs []attribute.KeyValue)
|
||||
}
|
||||
|
||||
// AsyncImpl is an implementation-level interface to an
|
||||
@ -68,7 +68,7 @@ type AsyncImpl interface {
|
||||
instrument.Asynchronous
|
||||
|
||||
// ObserveOne captures a single synchronous metric event.
|
||||
ObserveOne(ctx context.Context, number number.Number, attrs []attribute.KeyValue)
|
||||
ObserveOne(ctx context.Context, n number.Number, attrs []attribute.KeyValue)
|
||||
}
|
||||
|
||||
// AsyncRunner is expected to convert into an AsyncSingleRunner or an
|
||||
@ -105,10 +105,10 @@ type AsyncBatchRunner interface {
|
||||
|
||||
// NewMeasurement constructs a single observation, a binding between
|
||||
// an asynchronous instrument and a number.
|
||||
func NewMeasurement(instrument SyncImpl, number number.Number) Measurement {
|
||||
func NewMeasurement(inst SyncImpl, n number.Number) Measurement {
|
||||
return Measurement{
|
||||
instrument: instrument,
|
||||
number: number,
|
||||
instrument: inst,
|
||||
number: n,
|
||||
}
|
||||
}
|
||||
|
||||
@ -134,10 +134,10 @@ func (m Measurement) Number() number.Number {
|
||||
|
||||
// NewObservation constructs a single observation, a binding between
|
||||
// an asynchronous instrument and a number.
|
||||
func NewObservation(instrument AsyncImpl, number number.Number) Observation {
|
||||
func NewObservation(inst AsyncImpl, n number.Number) Observation {
|
||||
return Observation{
|
||||
instrument: instrument,
|
||||
number: number,
|
||||
instrument: inst,
|
||||
number: n,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -42,10 +42,12 @@ type (
|
||||
fObserver struct{ AsyncImpl }
|
||||
)
|
||||
|
||||
// WrapMeterImpl wraps impl to be a full implementation of a Meter.
|
||||
func WrapMeterImpl(impl MeterImpl) metric.Meter {
|
||||
return meter{impl}
|
||||
}
|
||||
|
||||
// UnwrapMeterImpl unwraps the Meter to its bare MeterImpl.
|
||||
func UnwrapMeterImpl(m metric.Meter) MeterImpl {
|
||||
mm, ok := m.(meter)
|
||||
if !ok {
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
)
|
||||
|
||||
func TestDetect(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
schema1, schema2 string
|
||||
|
@ -45,7 +45,6 @@ func makeAttrs(n int) (_, _ *resource.Resource) {
|
||||
} else {
|
||||
l2[i] = attribute.String(k, fmt.Sprint("v", rand.Intn(1000000000)))
|
||||
}
|
||||
|
||||
}
|
||||
return resource.NewSchemaless(l1...), resource.NewSchemaless(l2...)
|
||||
}
|
||||
|
@ -45,9 +45,9 @@ func TestStringDetectorErrors(t *testing.T) {
|
||||
{
|
||||
desc: "explicit error from func should be returned",
|
||||
s: resource.StringDetector("", attribute.Key("K"), func() (string, error) {
|
||||
return "", fmt.Errorf("K-IS-MISSING")
|
||||
return "", fmt.Errorf("k-is-missing")
|
||||
}),
|
||||
errContains: "K-IS-MISSING",
|
||||
errContains: "k-is-missing",
|
||||
},
|
||||
{
|
||||
desc: "empty key is an invalid",
|
||||
@ -74,5 +74,4 @@ func TestStringDetectorErrors(t *testing.T) {
|
||||
}
|
||||
require.EqualValues(t, map[string]string{"A": "B"}, m)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func fakeUnameProvider(buf *unix.Utsname) error {
|
||||
}
|
||||
|
||||
func fakeUnameProviderWithError(buf *unix.Utsname) error {
|
||||
return fmt.Errorf("Error invoking uname(2)")
|
||||
return fmt.Errorf("error invoking uname(2)")
|
||||
}
|
||||
|
||||
func TestUname(t *testing.T) {
|
||||
|
@ -56,10 +56,10 @@ var (
|
||||
|
||||
var (
|
||||
fakeExecutablePathProviderWithError = func() (string, error) {
|
||||
return "", fmt.Errorf("Unable to get process executable")
|
||||
return "", fmt.Errorf("unable to get process executable")
|
||||
}
|
||||
fakeOwnerProviderWithError = func() (*user.User, error) {
|
||||
return nil, fmt.Errorf("Unable to get process user")
|
||||
return nil, fmt.Errorf("unable to get process user")
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -129,6 +129,7 @@ func (r *Resource) Attributes() []attribute.KeyValue {
|
||||
return r.attrs.ToSlice()
|
||||
}
|
||||
|
||||
// SchemaURL returns the schema URL associated with Resource r.
|
||||
func (r *Resource) SchemaURL() string {
|
||||
if r == nil {
|
||||
return ""
|
||||
@ -179,13 +180,14 @@ func Merge(a, b *Resource) (*Resource, error) {
|
||||
|
||||
// Merge the schema URL.
|
||||
var schemaURL string
|
||||
if a.schemaURL == "" {
|
||||
switch true {
|
||||
case a.schemaURL == "":
|
||||
schemaURL = b.schemaURL
|
||||
} else if b.schemaURL == "" {
|
||||
case b.schemaURL == "":
|
||||
schemaURL = a.schemaURL
|
||||
} else if a.schemaURL == b.schemaURL {
|
||||
case a.schemaURL == b.schemaURL:
|
||||
schemaURL = a.schemaURL
|
||||
} else {
|
||||
default:
|
||||
return Empty(), errMergeConflictSchemaURL
|
||||
}
|
||||
|
||||
|
@ -35,8 +35,11 @@ const (
|
||||
DefaultMaxExportBatchSize = 512
|
||||
)
|
||||
|
||||
// BatchSpanProcessorOption configures a BatchSpanProcessor.
|
||||
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
|
||||
|
||||
// BatchSpanProcessorOptions is configuration settings for a
|
||||
// BatchSpanProcessor.
|
||||
type BatchSpanProcessorOptions struct {
|
||||
// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
|
||||
// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
|
||||
@ -181,7 +184,7 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
|
||||
var err error
|
||||
if bsp.e != nil {
|
||||
flushCh := make(chan struct{})
|
||||
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) {
|
||||
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
|
||||
select {
|
||||
case <-flushCh:
|
||||
// Processed any items in queue prior to ForceFlush being called
|
||||
@ -205,30 +208,43 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the
|
||||
// maximum queue size allowed for a BatchSpanProcessor.
|
||||
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
|
||||
return func(o *BatchSpanProcessorOptions) {
|
||||
o.MaxQueueSize = size
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures
|
||||
// the maximum export batch size allowed for a BatchSpanProcessor.
|
||||
func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
|
||||
return func(o *BatchSpanProcessorOptions) {
|
||||
o.MaxExportBatchSize = size
|
||||
}
|
||||
}
|
||||
|
||||
// WithBatchTimeout returns a BatchSpanProcessorOption that configures the
|
||||
// maximum delay allowed for a BatchSpanProcessor before it will export any
|
||||
// held span (whether the queue is full or not).
|
||||
func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
|
||||
return func(o *BatchSpanProcessorOptions) {
|
||||
o.BatchTimeout = delay
|
||||
}
|
||||
}
|
||||
|
||||
// WithExportTimeout returns a BatchSpanProcessorOption that configures the
|
||||
// amount of time a BatchSpanProcessor waits for an exporter to export before
|
||||
// abandoning the export.
|
||||
func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
|
||||
return func(o *BatchSpanProcessorOptions) {
|
||||
o.ExportTimeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
// WithBlocking returns a BatchSpanProcessorOption that configures a
|
||||
// BatchSpanProcessor to wait for enqueue operations to succeed instead of
|
||||
// dropping data when the queue is full.
|
||||
func WithBlocking() BatchSpanProcessorOption {
|
||||
return func(o *BatchSpanProcessorOptions) {
|
||||
o.BlockOnQueueFull = true
|
||||
@ -237,7 +253,6 @@ func WithBlocking() BatchSpanProcessorOption {
|
||||
|
||||
// exportSpans is a subroutine of processing and draining the queue.
|
||||
func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
|
||||
|
||||
bsp.timer.Reset(bsp.o.BatchTimeout)
|
||||
|
||||
bsp.batchMutex.Lock()
|
||||
@ -335,28 +350,35 @@ func (bsp *batchSpanProcessor) drainQueue() {
|
||||
}
|
||||
|
||||
func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
|
||||
bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull)
|
||||
ctx := context.TODO()
|
||||
if bsp.o.BlockOnQueueFull {
|
||||
bsp.enqueueBlockOnQueueFull(ctx, sd)
|
||||
} else {
|
||||
bsp.enqueueDrop(ctx, sd)
|
||||
}
|
||||
}
|
||||
|
||||
func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool {
|
||||
func recoverSendOnClosedChan() {
|
||||
x := recover()
|
||||
switch err := x.(type) {
|
||||
case nil:
|
||||
return
|
||||
case runtime.Error:
|
||||
if err.Error() == "send on closed channel" {
|
||||
return
|
||||
}
|
||||
}
|
||||
panic(x)
|
||||
}
|
||||
|
||||
func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
|
||||
if !sd.SpanContext().IsSampled() {
|
||||
return false
|
||||
}
|
||||
|
||||
// This ensures the bsp.queue<- below does not panic as the
|
||||
// processor shuts down.
|
||||
defer func() {
|
||||
x := recover()
|
||||
switch err := x.(type) {
|
||||
case nil:
|
||||
return
|
||||
case runtime.Error:
|
||||
if err.Error() == "send on closed channel" {
|
||||
return
|
||||
}
|
||||
}
|
||||
panic(x)
|
||||
}()
|
||||
defer recoverSendOnClosedChan()
|
||||
|
||||
select {
|
||||
case <-bsp.stopCh:
|
||||
@ -364,13 +386,27 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
|
||||
default:
|
||||
}
|
||||
|
||||
if block {
|
||||
select {
|
||||
case bsp.queue <- sd:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case bsp.queue <- sd:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
|
||||
if !sd.SpanContext().IsSampled() {
|
||||
return false
|
||||
}
|
||||
|
||||
// This ensures the bsp.queue<- below does not panic as the
|
||||
// processor shuts down.
|
||||
defer recoverSendOnClosedChan()
|
||||
|
||||
select {
|
||||
case <-bsp.stopCh:
|
||||
return false
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user