From e11b90cab21548ed11b2de90c4981bcd82a3510f Mon Sep 17 00:00:00 2001 From: Steven Karis Date: Thu, 17 Oct 2019 15:08:44 -0700 Subject: [PATCH] Implement W3C Correlation Context propagator (#179) * Implement W3C Correlation Context propagator * PR comments * PR comments * Update test to inject context properly * Fix merge --- api/propagation/noop_propagator.go | 5 +- api/propagation/propagator.go | 13 +- plugin/httptrace/httptrace.go | 10 +- plugin/othttp/handler.go | 3 +- propagation/http_b3_propagator.go | 7 +- .../http_b3_propagator_benchmark_test.go | 2 +- propagation/http_b3_propagator_test.go | 2 +- propagation/http_trace_context_propagator.go | 89 +++++++- .../http_trace_context_propagator_test.go | 208 +++++++++++++++++- 9 files changed, 309 insertions(+), 30 deletions(-) diff --git a/api/propagation/noop_propagator.go b/api/propagation/noop_propagator.go index dc185c80c..9e7447921 100644 --- a/api/propagation/noop_propagator.go +++ b/api/propagation/noop_propagator.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/api/core" + dctx "go.opentelemetry.io/api/distributedcontext" ) // NoopTextFormatPropagator implements TextFormatPropagator that does nothing. @@ -30,8 +31,8 @@ func (np NoopTextFormatPropagator) Inject(ctx context.Context, supplier Supplier } // Extract does nothing and returns an empty SpanContext -func (np NoopTextFormatPropagator) Extract(ctx context.Context, supplier Supplier) core.SpanContext { - return core.EmptySpanContext() +func (np NoopTextFormatPropagator) Extract(ctx context.Context, supplier Supplier) (core.SpanContext, dctx.Map) { + return core.EmptySpanContext(), dctx.NewEmptyMap() } // GetAllKeys returns empty list of strings. diff --git a/api/propagation/propagator.go b/api/propagation/propagator.go index d7adb7e9f..3da0d287a 100644 --- a/api/propagation/propagator.go +++ b/api/propagation/propagator.go @@ -18,22 +18,25 @@ import ( "context" "go.opentelemetry.io/api/core" + dctx "go.opentelemetry.io/api/distributedcontext" ) // TextFormatPropagator is an interface that specifies methods to inject and extract SpanContext -// into/from a carrier using Supplier interface. +// and distributed context into/from a carrier using Supplier interface. // For example, HTTP Trace Context propagator would encode SpanContext into W3C Trace // Context Header and set the header into HttpRequest. type TextFormatPropagator interface { // Inject method retrieves current SpanContext from the ctx, encodes it into propagator // specific format and then injects the encoded SpanContext using supplier into a carrier - // associated with the supplier. + // associated with the supplier. It also takes a correlationCtx whose values will be + // injected into a carrier using the supplier. Inject(ctx context.Context, supplier Supplier) // Extract method retrieves encoded SpanContext using supplier from the associated carrier. - // It decodes the SpanContext and returns it. If no SpanContext was retrieved OR - // if the retrieved SpanContext is invalid then an empty SpanContext is returned. - Extract(ctx context.Context, supplier Supplier) core.SpanContext + // It decodes the SpanContext and returns it and a dctx of correlated context. + // If no SpanContext was retrieved OR if the retrieved SpanContext is invalid then + // an empty SpanContext is returned. + Extract(ctx context.Context, supplier Supplier) (core.SpanContext, dctx.Map) // GetAllKeys returns all the keys that this propagator injects/extracts into/from a // carrier. The use cases for this are diff --git a/plugin/httptrace/httptrace.go b/plugin/httptrace/httptrace.go index 409fa568e..b6efcfd9d 100644 --- a/plugin/httptrace/httptrace.go +++ b/plugin/httptrace/httptrace.go @@ -36,14 +36,20 @@ var ( // Returns the Attributes, Context Entries, and SpanContext that were encoded by Inject. func Extract(ctx context.Context, req *http.Request) ([]core.KeyValue, []core.KeyValue, core.SpanContext) { - sc := propagator.Extract(ctx, req.Header) + sc, correlationCtx := propagator.Extract(ctx, req.Header) attrs := []core.KeyValue{ URLKey.String(req.URL.String()), // Etc. } - return attrs, nil, sc + var correlationCtxKVs []core.KeyValue + correlationCtx.Foreach(func(kv core.KeyValue) bool { + correlationCtxKVs = append(correlationCtxKVs, kv) + return true + }) + + return attrs, correlationCtxKVs, sc } func Inject(ctx context.Context, req *http.Request) { diff --git a/plugin/othttp/handler.go b/plugin/othttp/handler.go index b8d0fae5c..c62b9b79e 100644 --- a/plugin/othttp/handler.go +++ b/plugin/othttp/handler.go @@ -143,7 +143,8 @@ func NewHandler(handler http.Handler, operation string, opts ...Option) http.Han func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { opts := append([]trace.SpanOption{}, h.spanOptions...) // start with the configured options - sc := h.prop.Extract(r.Context(), r.Header) + // TODO: do something with the correlation context + sc, _ := h.prop.Extract(r.Context(), r.Header) if sc.IsValid() { // not a valid span context, so no link / parent relationship to establish var opt trace.SpanOption if h.public { diff --git a/propagation/http_b3_propagator.go b/propagation/http_b3_propagator.go index e378bed70..90fca8605 100644 --- a/propagation/http_b3_propagator.go +++ b/propagation/http_b3_propagator.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/api/trace" "go.opentelemetry.io/api/core" + dctx "go.opentelemetry.io/api/distributedcontext" apipropagation "go.opentelemetry.io/api/propagation" ) @@ -84,11 +85,11 @@ func (b3 HTTPB3Propagator) Inject(ctx context.Context, supplier apipropagation.S } // Extract retrieves B3 Headers from the supplier -func (b3 HTTPB3Propagator) Extract(ctx context.Context, supplier apipropagation.Supplier) core.SpanContext { +func (b3 HTTPB3Propagator) Extract(ctx context.Context, supplier apipropagation.Supplier) (core.SpanContext, dctx.Map) { if b3.SingleHeader { - return b3.extractSingleHeader(supplier) + return b3.extractSingleHeader(supplier), dctx.NewEmptyMap() } - return b3.extract(supplier) + return b3.extract(supplier), dctx.NewEmptyMap() } func (b3 HTTPB3Propagator) GetAllKeys() []string { diff --git a/propagation/http_b3_propagator_benchmark_test.go b/propagation/http_b3_propagator_benchmark_test.go index beed63df3..52169b63c 100644 --- a/propagation/http_b3_propagator_benchmark_test.go +++ b/propagation/http_b3_propagator_benchmark_test.go @@ -65,7 +65,7 @@ func BenchmarkExtractB3(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - _ = propagator.Extract(ctx, req.Header) + _, _ = propagator.Extract(ctx, req.Header) } }) } diff --git a/propagation/http_b3_propagator_test.go b/propagation/http_b3_propagator_test.go index 68baa8fc5..8e0e386d3 100644 --- a/propagation/http_b3_propagator_test.go +++ b/propagation/http_b3_propagator_test.go @@ -65,7 +65,7 @@ func TestExtractB3(t *testing.T) { } ctx := context.Background() - gotSc := propagator.Extract(ctx, req.Header) + gotSc, _ := propagator.Extract(ctx, req.Header) if diff := cmp.Diff(gotSc, tt.wantSc); diff != "" { t.Errorf("%s: %s: -got +want %s", tg.name, tt.name, diff) } diff --git a/propagation/http_trace_context_propagator.go b/propagation/http_trace_context_propagator.go index f1ebfce4f..1cf79292f 100644 --- a/propagation/http_trace_context_propagator.go +++ b/propagation/http_trace_context_propagator.go @@ -18,20 +18,23 @@ import ( "context" "encoding/hex" "fmt" + "net/url" "regexp" "strconv" "strings" - "go.opentelemetry.io/api/trace" - "go.opentelemetry.io/api/core" + dctx "go.opentelemetry.io/api/distributedcontext" + "go.opentelemetry.io/api/key" apipropagation "go.opentelemetry.io/api/propagation" + "go.opentelemetry.io/api/trace" ) const ( - supportedVersion = 0 - maxVersion = 254 - TraceparentHeader = "Traceparent" + supportedVersion = 0 + maxVersion = 254 + TraceparentHeader = "Traceparent" + CorrelationContextHeader = "Correlation-Context" ) // HTTPTraceContextPropagator propagates SpanContext in W3C TraceContext format. @@ -51,9 +54,35 @@ func (hp HTTPTraceContextPropagator) Inject(ctx context.Context, supplier apipro sc.TraceFlags&core.TraceFlagsSampled) supplier.Set(TraceparentHeader, h) } + + correlationCtx := dctx.FromContext(ctx) + firstIter := true + var headerValueBuilder strings.Builder + correlationCtx.Foreach(func(kv core.KeyValue) bool { + if !firstIter { + headerValueBuilder.WriteRune(',') + } + firstIter = false + headerValueBuilder.WriteString(url.QueryEscape(strings.TrimSpace((string)(kv.Key)))) + headerValueBuilder.WriteRune('=') + headerValueBuilder.WriteString(url.QueryEscape(strings.TrimSpace(kv.Value.Emit()))) + return true + }) + if headerValueBuilder.Len() > 0 { + headerString := headerValueBuilder.String() + supplier.Set(CorrelationContextHeader, headerString) + } } -func (hp HTTPTraceContextPropagator) Extract(ctx context.Context, supplier apipropagation.Supplier) core.SpanContext { +func (hp HTTPTraceContextPropagator) Extract( + ctx context.Context, supplier apipropagation.Supplier, +) (core.SpanContext, dctx.Map) { + return hp.extractSpanContext(ctx, supplier), hp.extractCorrelationCtx(ctx, supplier) +} + +func (hp HTTPTraceContextPropagator) extractSpanContext( + ctx context.Context, supplier apipropagation.Supplier, +) core.SpanContext { h := supplier.Get(TraceparentHeader) if h == "" { return core.EmptySpanContext() @@ -128,6 +157,50 @@ func (hp HTTPTraceContextPropagator) Extract(ctx context.Context, supplier apipr return sc } -func (hp HTTPTraceContextPropagator) GetAllKeys() []string { - return []string{TraceparentHeader} +func (hp HTTPTraceContextPropagator) extractCorrelationCtx(ctx context.Context, supplier apipropagation.Supplier) dctx.Map { + correlationContext := supplier.Get(CorrelationContextHeader) + if correlationContext == "" { + return dctx.NewEmptyMap() + } + + contextValues := strings.Split(correlationContext, ",") + keyValues := make([]core.KeyValue, 0, len(contextValues)) + for _, contextValue := range contextValues { + valueAndProps := strings.Split(contextValue, ";") + if len(valueAndProps) < 1 { + continue + } + nameValue := strings.Split(valueAndProps[0], "=") + if len(nameValue) < 2 { + continue + } + name, err := url.QueryUnescape(nameValue[0]) + if err != nil { + continue + } + trimmedName := strings.TrimSpace(name) + value, err := url.QueryUnescape(nameValue[1]) + if err != nil { + continue + } + trimmedValue := strings.TrimSpace(value) + + // TODO (skaris): properties defiend https://w3c.github.io/correlation-context/, are currently + // just put as part of the value. + var trimmedValueWithProps strings.Builder + trimmedValueWithProps.WriteString(trimmedValue) + for _, prop := range valueAndProps[1:] { + trimmedValueWithProps.WriteRune(';') + trimmedValueWithProps.WriteString(prop) + } + + keyValues = append(keyValues, key.New(trimmedName).String(trimmedValueWithProps.String())) + } + return dctx.NewMap(dctx.MapUpdate{ + MultiKV: keyValues, + }) +} + +func (hp HTTPTraceContextPropagator) GetAllKeys() []string { + return []string{TraceparentHeader, CorrelationContextHeader} } diff --git a/propagation/http_trace_context_propagator_test.go b/propagation/http_trace_context_propagator_test.go index 4d7aa5757..536a9f843 100644 --- a/propagation/http_trace_context_propagator_test.go +++ b/propagation/http_trace_context_propagator_test.go @@ -17,13 +17,15 @@ package propagation_test import ( "context" "net/http" + "strings" "testing" - "go.opentelemetry.io/api/trace" - "github.com/google/go-cmp/cmp" "go.opentelemetry.io/api/core" + dctx "go.opentelemetry.io/api/distributedcontext" + "go.opentelemetry.io/api/key" + "go.opentelemetry.io/api/trace" mocktrace "go.opentelemetry.io/internal/trace" "go.opentelemetry.io/propagation" ) @@ -42,7 +44,7 @@ func TestExtractValidTraceContextFromHTTPReq(t *testing.T) { wantSc core.SpanContext }{ { - name: "valid b3Header", + name: "valid w3cHeader", header: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00", wantSc: core.SpanContext{ TraceID: traceID, @@ -50,7 +52,7 @@ func TestExtractValidTraceContextFromHTTPReq(t *testing.T) { }, }, { - name: "valid b3Header and sampled", + name: "valid w3cHeader and sampled", header: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", wantSc: core.SpanContext{ TraceID: traceID, @@ -119,7 +121,7 @@ func TestExtractValidTraceContextFromHTTPReq(t *testing.T) { req.Header.Set("traceparent", tt.header) ctx := context.Background() - gotSc := propagator.Extract(ctx, req.Header) + gotSc, _ := propagator.Extract(ctx, req.Header) if diff := cmp.Diff(gotSc, tt.wantSc); diff != "" { t.Errorf("Extract Tracecontext: %s: -got +want %s", tt.name, diff) } @@ -207,7 +209,7 @@ func TestExtractInvalidTraceContextFromHTTPReq(t *testing.T) { req.Header.Set("traceparent", tt.header) ctx := context.Background() - gotSc := propagator.Extract(ctx, req.Header) + gotSc, _ := propagator.Extract(ctx, req.Header) if diff := cmp.Diff(gotSc, wantSc); diff != "" { t.Errorf("Extract Tracecontext: %s: -got +want %s", tt.name, diff) } @@ -276,9 +278,201 @@ func TestInjectTraceContextToHTTPReq(t *testing.T) { } } +func TestExtractValidDistributedContextFromHTTPReq(t *testing.T) { + trace.SetGlobalTracer(&mocktrace.MockTracer{}) + propagator := propagation.HTTPTraceContextPropagator{} + tests := []struct { + name string + header string + wantKVs []core.KeyValue + }{ + { + name: "valid w3cHeader", + header: "key1=val1,key2=val2", + wantKVs: []core.KeyValue{ + key.New("key1").String("val1"), + key.New("key2").String("val2"), + }, + }, + { + name: "valid w3cHeader with spaces", + header: "key1 = val1, key2 =val2 ", + wantKVs: []core.KeyValue{ + key.New("key1").String("val1"), + key.New("key2").String("val2"), + }, + }, + { + name: "valid w3cHeader with properties", + header: "key1=val1,key2=val2;prop=1", + wantKVs: []core.KeyValue{ + key.New("key1").String("val1"), + key.New("key2").String("val2;prop=1"), + }, + }, + { + name: "valid header with url-escaped comma", + header: "key1=val1,key2=val2%2Cval3", + wantKVs: []core.KeyValue{ + key.New("key1").String("val1"), + key.New("key2").String("val2,val3"), + }, + }, + { + name: "valid header with an invalid header", + header: "key1=val1,key2=val2,a,val3", + wantKVs: []core.KeyValue{ + key.New("key1").String("val1"), + key.New("key2").String("val2"), + }, + }, + { + name: "valid header with no value", + header: "key1=,key2=val2", + wantKVs: []core.KeyValue{ + key.New("key1").String(""), + key.New("key2").String("val2"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req, _ := http.NewRequest("GET", "http://example.com", nil) + req.Header.Set("Correlation-Context", tt.header) + + ctx := context.Background() + _, gotCorCtx := propagator.Extract(ctx, req.Header) + wantCorCtx := dctx.NewMap(dctx.MapUpdate{MultiKV: tt.wantKVs}) + if gotCorCtx.Len() != wantCorCtx.Len() { + t.Errorf( + "Got and Want CorCtx are not the same size %d != %d", + gotCorCtx.Len(), + wantCorCtx.Len(), + ) + } + totalDiff := "" + wantCorCtx.Foreach(func(kv core.KeyValue) bool { + val, _ := gotCorCtx.Value(kv.Key) + diff := cmp.Diff(kv, core.KeyValue{Key: kv.Key, Value: val}) + if diff != "" { + totalDiff += diff + "\n" + } + return true + }) + if totalDiff != "" { + t.Errorf("Extract Tracecontext: %s: -got +want %s", tt.name, totalDiff) + } + }) + } +} + +func TestExtractInvalidDistributedContextFromHTTPReq(t *testing.T) { + trace.SetGlobalTracer(&mocktrace.MockTracer{}) + propagator := propagation.HTTPTraceContextPropagator{} + tests := []struct { + name string + header string + }{ + { + name: "no key values", + header: "header1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req, _ := http.NewRequest("GET", "http://example.com", nil) + req.Header.Set("Correlation-Context", tt.header) + + ctx := context.Background() + _, gotCorCtx := propagator.Extract(ctx, req.Header) + if gotCorCtx.Len() != 0 { + t.Errorf("Got and Want CorCtx are not the same size %d != %d", gotCorCtx.Len(), 0) + } + }) + } +} + +func TestInjectCorrelationContextToHTTPReq(t *testing.T) { + propagator := propagation.HTTPTraceContextPropagator{} + tests := []struct { + name string + kvs []core.KeyValue + wantInHeader []string + wantedLen int + }{ + { + name: "two simple values", + kvs: []core.KeyValue{ + key.New("key1").String("val1"), + key.New("key2").String("val2"), + }, + wantInHeader: []string{"key1=val1", "key2=val2"}, + }, + { + name: "two values with escaped chars", + kvs: []core.KeyValue{ + key.New("key1").String("val1,val2"), + key.New("key2").String("val3=4"), + }, + wantInHeader: []string{"key1=val1%2Cval2", "key2=val3%3D4"}, + }, + { + name: "values of non-string types", + kvs: []core.KeyValue{ + key.New("key1").Bool(true), + key.New("key2").Int(123), + key.New("key3").Int64(123), + key.New("key4").Int32(123), + key.New("key5").Uint(123), + key.New("key6").Uint32(123), + key.New("key7").Uint64(123), + key.New("key8").Float64(123.567), + key.New("key9").Float32(123.567), + key.New("key10").Bytes([]byte{0x68, 0x69}), + }, + wantInHeader: []string{ + "key1=true", + "key2=123", + "key3=123", + "key4=123", + "key5=123", + "key6=123", + "key7=123", + "key8=123.567", + "key9=123.56700134277344", + "key10=hi", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req, _ := http.NewRequest("GET", "http://example.com", nil) + ctx := dctx.WithMap(context.Background(), dctx.NewMap(dctx.MapUpdate{MultiKV: tt.kvs})) + propagator.Inject(ctx, req.Header) + + gotHeader := req.Header.Get("Correlation-Context") + wantedLen := len(strings.Join(tt.wantInHeader, ",")) + if wantedLen != len(gotHeader) { + t.Errorf( + "%s: Inject Correlation-Context incorrect length %d != %d.", tt.name, tt.wantedLen, len(gotHeader), + ) + } + for _, inHeader := range tt.wantInHeader { + if !strings.Contains(gotHeader, inHeader) { + t.Errorf( + "%s: Inject Correlation-Context missing part of header: %s in %s", tt.name, inHeader, gotHeader, + ) + } + } + }) + } +} + func TestHttpTraceContextPropagator_GetAllKeys(t *testing.T) { var propagator propagation.HTTPTraceContextPropagator - want := []string{"Traceparent"} + want := []string{"Traceparent", "Correlation-Context"} got := propagator.GetAllKeys() if diff := cmp.Diff(got, want); diff != "" { t.Errorf("GetAllKeys: -got +want %s", diff)