1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-25 22:41:46 +02:00

Add global propagators (#494)

* Add global propagators

The default global propagators are set to the chained W3C trace and
correlation context propagators.

* Use global propagators in plugins

The httptrace and grpcplugins should also get some API for setting a
propagator to use (the othttp plugin already has such an API), but
that can come in some other PR.

* Decrease indentation in trace propagators

* Drop obsolete TODOs

Now we do "something" with correlation context - it ends up in the
context, and we put the context into the request, so the chained HTTP
handler can access it too.

The other TODO was about tag.Upsert which is long gone.

* Do not unnecessarily update the request context

The request context already contains the span (and we add some
attribute there), so inserting it into context again is pointless.

Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
This commit is contained in:
Rahul Patel
2020-03-05 10:12:10 -08:00
committed by GitHub
parent 6769330394
commit 547d584da8
7 changed files with 85 additions and 58 deletions

View File

@@ -17,6 +17,7 @@ package global
import ( import (
"go.opentelemetry.io/otel/api/global/internal" "go.opentelemetry.io/otel/api/global/internal"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/api/trace"
) )
@@ -48,3 +49,15 @@ func MeterProvider() metric.Provider {
func SetMeterProvider(mp metric.Provider) { func SetMeterProvider(mp metric.Provider) {
internal.SetMeterProvider(mp) internal.SetMeterProvider(mp)
} }
// Propagators returns the registered global propagators instance. If
// none is registered then an instance of propagators.NoopPropagators
// is returned.
func Propagators() propagation.Propagators {
return internal.Propagators()
}
// SetPropagators registers `p` as the global propagators instance.
func SetPropagators(p propagation.Propagators) {
internal.SetPropagators(p)
}

View File

@@ -4,7 +4,9 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"go.opentelemetry.io/otel/api/correlation"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/api/trace"
) )
@@ -16,11 +18,16 @@ type (
meterProviderHolder struct { meterProviderHolder struct {
mp metric.Provider mp metric.Provider
} }
propagatorsHolder struct {
pr propagation.Propagators
}
) )
var ( var (
globalTracer = defaultTracerValue() globalTracer = defaultTracerValue()
globalMeter = defaultMeterValue() globalMeter = defaultMeterValue()
globalPropagators = defaultPropagatorsValue()
delegateMeterOnce sync.Once delegateMeterOnce sync.Once
delegateTraceOnce sync.Once delegateTraceOnce sync.Once
@@ -70,6 +77,16 @@ func SetMeterProvider(mp metric.Provider) {
globalMeter.Store(meterProviderHolder{mp: mp}) globalMeter.Store(meterProviderHolder{mp: mp})
} }
// Propagators is the internal implementation for global.Propagators.
func Propagators() propagation.Propagators {
return globalPropagators.Load().(propagatorsHolder).pr
}
// SetPropagators is the internal implementation for global.SetPropagators.
func SetPropagators(pr propagation.Propagators) {
globalPropagators.Store(propagatorsHolder{pr: pr})
}
func defaultTracerValue() *atomic.Value { func defaultTracerValue() *atomic.Value {
v := &atomic.Value{} v := &atomic.Value{}
v.Store(traceProviderHolder{tp: &traceProvider{}}) v.Store(traceProviderHolder{tp: &traceProvider{}})
@@ -82,10 +99,28 @@ func defaultMeterValue() *atomic.Value {
return v return v
} }
func defaultPropagatorsValue() *atomic.Value {
v := &atomic.Value{}
v.Store(propagatorsHolder{pr: getDefaultPropagators()})
return v
}
// getDefaultPropagators returns a default Propagators, configured
// with W3C trace and correlation context propagation.
func getDefaultPropagators() propagation.Propagators {
tcPropagator := trace.TraceContext{}
ccPropagator := correlation.CorrelationContext{}
return propagation.New(
propagation.WithExtractors(tcPropagator, ccPropagator),
propagation.WithInjectors(tcPropagator, ccPropagator),
)
}
// ResetForTest restores the initial global state, for testing purposes. // ResetForTest restores the initial global state, for testing purposes.
func ResetForTest() { func ResetForTest() {
globalTracer = defaultTracerValue() globalTracer = defaultTracerValue()
globalMeter = defaultMeterValue() globalMeter = defaultMeterValue()
globalPropagators = defaultPropagatorsValue()
delegateMeterOnce = sync.Once{} delegateMeterOnce = sync.Once{}
delegateTraceOnce = sync.Once{} delegateTraceOnce = sync.Once{}
} }

View File

@@ -53,24 +53,25 @@ var _ propagation.HTTPPropagator = B3{}
func (b3 B3) Inject(ctx context.Context, supplier propagation.HTTPSupplier) { func (b3 B3) Inject(ctx context.Context, supplier propagation.HTTPSupplier) {
sc := SpanFromContext(ctx).SpanContext() sc := SpanFromContext(ctx).SpanContext()
if sc.IsValid() { if !sc.IsValid() {
if b3.SingleHeader { return
sampled := sc.TraceFlags & core.TraceFlagsSampled }
supplier.Set(B3SingleHeader, if b3.SingleHeader {
fmt.Sprintf("%s-%.16x-%.1d", sc.TraceIDString(), sc.SpanID, sampled)) sampled := sc.TraceFlags & core.TraceFlagsSampled
} else { supplier.Set(B3SingleHeader,
supplier.Set(B3TraceIDHeader, sc.TraceIDString()) fmt.Sprintf("%s-%.16x-%.1d", sc.TraceIDString(), sc.SpanID, sampled))
supplier.Set(B3SpanIDHeader, } else {
fmt.Sprintf("%.16x", sc.SpanID)) supplier.Set(B3TraceIDHeader, sc.TraceIDString())
supplier.Set(B3SpanIDHeader,
fmt.Sprintf("%.16x", sc.SpanID))
var sampled string var sampled string
if sc.IsSampled() { if sc.IsSampled() {
sampled = "1" sampled = "1"
} else { } else {
sampled = "0" sampled = "0"
}
supplier.Set(B3SampledHeader, sampled)
} }
supplier.Set(B3SampledHeader, sampled)
} }
} }

View File

@@ -45,14 +45,15 @@ func DefaultHTTPPropagator() propagation.HTTPPropagator {
func (TraceContext) Inject(ctx context.Context, supplier propagation.HTTPSupplier) { func (TraceContext) Inject(ctx context.Context, supplier propagation.HTTPSupplier) {
sc := SpanFromContext(ctx).SpanContext() sc := SpanFromContext(ctx).SpanContext()
if sc.IsValid() { if !sc.IsValid() {
h := fmt.Sprintf("%.2x-%s-%.16x-%.2x", return
supportedVersion,
sc.TraceIDString(),
sc.SpanID,
sc.TraceFlags&core.TraceFlagsSampled)
supplier.Set(traceparentHeader, h)
} }
h := fmt.Sprintf("%.2x-%s-%.16x-%.2x",
supportedVersion,
sc.TraceIDString(),
sc.SpanID,
sc.TraceFlags&core.TraceFlagsSampled)
supplier.Set(traceparentHeader, h)
} }
func (tc TraceContext) Extract(ctx context.Context, supplier propagation.HTTPSupplier) context.Context { func (tc TraceContext) Extract(ctx context.Context, supplier propagation.HTTPSupplier) context.Context {

View File

@@ -21,19 +21,11 @@ import (
"go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/correlation" "go.opentelemetry.io/otel/api/correlation"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/propagation" "go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/api/trace"
) )
var (
tcPropagator = trace.DefaultHTTPPropagator()
ccPropagator = correlation.DefaultHTTPPropagator()
propagators = propagation.New(
propagation.WithInjectors(tcPropagator, ccPropagator),
propagation.WithExtractors(tcPropagator, ccPropagator),
)
)
type metadataSupplier struct { type metadataSupplier struct {
metadata *metadata.MD metadata *metadata.MD
} }
@@ -54,7 +46,7 @@ func (s *metadataSupplier) Set(key string, value string) {
// metadata object. This function is meant to be used on outgoing // metadata object. This function is meant to be used on outgoing
// requests. // requests.
func Inject(ctx context.Context, metadata *metadata.MD) { func Inject(ctx context.Context, metadata *metadata.MD) {
propagation.InjectHTTP(ctx, propagators, &metadataSupplier{ propagation.InjectHTTP(ctx, global.Propagators(), &metadataSupplier{
metadata: metadata, metadata: metadata,
}) })
} }
@@ -63,7 +55,7 @@ func Inject(ctx context.Context, metadata *metadata.MD) {
// another service encoded in the gRPC metadata object with Inject. // another service encoded in the gRPC metadata object with Inject.
// This function is meant to be used on incoming requests. // This function is meant to be used on incoming requests.
func Extract(ctx context.Context, metadata *metadata.MD) ([]core.KeyValue, core.SpanContext) { func Extract(ctx context.Context, metadata *metadata.MD) ([]core.KeyValue, core.SpanContext) {
ctx = propagation.ExtractHTTP(ctx, propagators, &metadataSupplier{ ctx = propagation.ExtractHTTP(ctx, global.Propagators(), &metadataSupplier{
metadata: metadata, metadata: metadata,
}) })

View File

@@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/correlation" "go.opentelemetry.io/otel/api/correlation"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/propagation" "go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/api/trace"
@@ -28,18 +29,11 @@ import (
var ( var (
HostKey = key.New("http.host") HostKey = key.New("http.host")
URLKey = key.New("http.url") URLKey = key.New("http.url")
tcPropagator = trace.DefaultHTTPPropagator()
ccPropagator = correlation.DefaultHTTPPropagator()
propagators = propagation.New(
propagation.WithInjectors(tcPropagator, ccPropagator),
propagation.WithExtractors(tcPropagator, ccPropagator),
)
) )
// Returns the Attributes, Context Entries, and SpanContext that were encoded by Inject. // Returns the Attributes, Context Entries, and SpanContext that were encoded by Inject.
func Extract(ctx context.Context, req *http.Request) ([]core.KeyValue, []core.KeyValue, core.SpanContext) { func Extract(ctx context.Context, req *http.Request) ([]core.KeyValue, []core.KeyValue, core.SpanContext) {
ctx = propagation.ExtractHTTP(ctx, propagators, req.Header) ctx = propagation.ExtractHTTP(ctx, global.Propagators(), req.Header)
attrs := []core.KeyValue{ attrs := []core.KeyValue{
URLKey.String(req.URL.String()), URLKey.String(req.URL.String()),
@@ -56,5 +50,5 @@ func Extract(ctx context.Context, req *http.Request) ([]core.KeyValue, []core.Ke
} }
func Inject(ctx context.Context, req *http.Request) { func Inject(ctx context.Context, req *http.Request) {
propagation.InjectHTTP(ctx, propagators, req.Header) propagation.InjectHTTP(ctx, global.Propagators(), req.Header)
} }

View File

@@ -19,7 +19,6 @@ import (
"net/http" "net/http"
"go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/correlation"
"go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/propagation" "go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/trace" "go.opentelemetry.io/otel/api/trace"
@@ -78,8 +77,8 @@ func WithPublicEndpoint() Option {
} }
// WithPropagators configures the Handler with specific propagators. If this // WithPropagators configures the Handler with specific propagators. If this
// option isn't specified then Propagators with // option isn't specified then
// go.opentelemetry.io/otel/api/trace.DefaultHTTPPropagator are used. // go.opentelemetry.io/otel/api/global.Propagators are used.
func WithPropagators(ps propagation.Propagators) Option { func WithPropagators(ps propagation.Propagators) Option {
return func(h *Handler) { return func(h *Handler) {
h.props = ps h.props = ps
@@ -128,15 +127,9 @@ func WithMessageEvents(events ...event) Option {
// named after the operation and with any provided HandlerOptions. // named after the operation and with any provided HandlerOptions.
func NewHandler(handler http.Handler, operation string, opts ...Option) http.Handler { func NewHandler(handler http.Handler, operation string, opts ...Option) http.Handler {
h := Handler{handler: handler, operation: operation} h := Handler{handler: handler, operation: operation}
tcPropagator := trace.DefaultHTTPPropagator()
ccPropagator := correlation.DefaultHTTPPropagator()
props := propagation.New(
propagation.WithInjectors(tcPropagator, ccPropagator),
propagation.WithExtractors(tcPropagator, ccPropagator),
)
defaultOpts := []Option{ defaultOpts := []Option{
WithTracer(global.TraceProvider().Tracer("go.opentelemetry.io/plugin/othttp")), WithTracer(global.TraceProvider().Tracer("go.opentelemetry.io/plugin/othttp")),
WithPropagators(props), WithPropagators(global.Propagators()),
WithSpanOptions(trace.WithSpanKind(trace.SpanKindServer)), WithSpanOptions(trace.WithSpanKind(trace.SpanKindServer)),
} }
@@ -150,7 +143,6 @@ func NewHandler(handler http.Handler, operation string, opts ...Option) http.Han
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
opts := append([]trace.StartOption{}, h.spanStartOptions...) // start with the configured options opts := append([]trace.StartOption{}, h.spanStartOptions...) // start with the configured options
// TODO: do something with the correlation context
ctx := propagation.ExtractHTTP(r.Context(), h.props, r.Header) ctx := propagation.ExtractHTTP(r.Context(), h.props, r.Header)
ctx, span := h.tracer.Start(ctx, h.operation, opts...) ctx, span := h.tracer.Start(ctx, h.operation, opts...)
defer span.End() defer span.End()
@@ -215,8 +207,7 @@ func setAfterServeAttributes(span trace.Span, read, wrote, statusCode int64, rer
func WithRouteTag(route string, h http.Handler) http.Handler { func WithRouteTag(route string, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span := trace.SpanFromContext(r.Context()) span := trace.SpanFromContext(r.Context())
//TODO: Why doesn't tag.Upsert work?
span.SetAttributes(RouteKey.String(route)) span.SetAttributes(RouteKey.String(route))
h.ServeHTTP(w, r.WithContext(trace.ContextWithSpan(r.Context(), span))) h.ServeHTTP(w, r)
}) })
} }