mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-04 09:43:23 +02:00
handle multiples connect start/done trace (#283)
* handle race condition: close handled before open * add dummy noop span. * add test for clienttrace * accept multiples `ConnectStart`/`ConnectDone` trace hooks. * formatting * change argument name and add error/attributes to end method. * add tests for connection race condition
This commit is contained in:
parent
50a419f477
commit
ecf3bb9d7c
@ -22,26 +22,7 @@ import (
|
||||
|
||||
// Client
|
||||
func W3C(ctx context.Context, req *http.Request) (context.Context, *http.Request) {
|
||||
t := newClientTracer(ctx)
|
||||
|
||||
t.GetConn = t.getConn
|
||||
t.GotConn = t.gotConn
|
||||
t.PutIdleConn = t.putIdleConn
|
||||
t.GotFirstResponseByte = t.gotFirstResponseByte
|
||||
t.Got100Continue = t.got100Continue
|
||||
t.Got1xxResponse = t.got1xxResponse
|
||||
t.DNSStart = t.dnsStart
|
||||
t.DNSDone = t.dnsDone
|
||||
t.ConnectStart = t.connectStart
|
||||
t.ConnectDone = t.connectDone
|
||||
t.TLSHandshakeStart = t.tlsHandshakeStart
|
||||
t.TLSHandshakeDone = t.tlsHandshakeDone
|
||||
t.WroteHeaderField = t.wroteHeaderField
|
||||
t.WroteHeaders = t.wroteHeaders
|
||||
t.Wait100Continue = t.wait100Continue
|
||||
t.WroteRequest = t.wroteRequest
|
||||
|
||||
ctx = httptrace.WithClientTrace(ctx, &t.ClientTrace)
|
||||
ctx = httptrace.WithClientTrace(ctx, NewClientTrace(ctx))
|
||||
req = req.WithContext(ctx)
|
||||
return ctx, req
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ package httptrace
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http/httptrace"
|
||||
"net/textproto"
|
||||
"strings"
|
||||
@ -41,108 +40,139 @@ var (
|
||||
|
||||
type clientTracer struct {
|
||||
context.Context
|
||||
httptrace.ClientTrace
|
||||
|
||||
tr trace.Tracer
|
||||
|
||||
levels map[string]trace.Span
|
||||
root trace.Span
|
||||
mtx sync.Mutex
|
||||
activeHooks map[string]trace.Span
|
||||
root trace.Span
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func newClientTracer(ctx context.Context) *clientTracer {
|
||||
func NewClientTrace(ctx context.Context) *httptrace.ClientTrace {
|
||||
ct := &clientTracer{
|
||||
Context: ctx,
|
||||
levels: make(map[string]trace.Span),
|
||||
Context: ctx,
|
||||
activeHooks: make(map[string]trace.Span),
|
||||
}
|
||||
|
||||
ct.tr = global.TraceProvider().GetTracer("go.opentelemetry.io/otel/plugin/httptrace")
|
||||
ct.open("http.request")
|
||||
return ct
|
||||
ct.start("http.request", "http.request")
|
||||
|
||||
return &httptrace.ClientTrace{
|
||||
GetConn: ct.getConn,
|
||||
GotConn: ct.gotConn,
|
||||
PutIdleConn: ct.putIdleConn,
|
||||
GotFirstResponseByte: ct.gotFirstResponseByte,
|
||||
Got100Continue: ct.got100Continue,
|
||||
Got1xxResponse: ct.got1xxResponse,
|
||||
DNSStart: ct.dnsStart,
|
||||
DNSDone: ct.dnsDone,
|
||||
ConnectStart: ct.connectStart,
|
||||
ConnectDone: ct.connectDone,
|
||||
TLSHandshakeStart: ct.tlsHandshakeStart,
|
||||
TLSHandshakeDone: ct.tlsHandshakeDone,
|
||||
WroteHeaderField: ct.wroteHeaderField,
|
||||
WroteHeaders: ct.wroteHeaders,
|
||||
Wait100Continue: ct.wait100Continue,
|
||||
WroteRequest: ct.wroteRequest,
|
||||
}
|
||||
}
|
||||
|
||||
func (ct *clientTracer) open(name string, attrs ...core.KeyValue) {
|
||||
_, sp := ct.tr.Start(ct.Context, name, trace.WithAttributes(attrs...), trace.WithSpanKind(trace.SpanKindClient))
|
||||
func (ct *clientTracer) start(hook, spanName string, attrs ...core.KeyValue) {
|
||||
_, sp := ct.tr.Start(ct.Context, spanName, trace.WithAttributes(attrs...), trace.WithSpanKind(trace.SpanKindClient))
|
||||
// TODO(paivagustavo): remove this for loop when `trace.WithAttributes(attrs...)` works.
|
||||
for _, attr := range attrs {
|
||||
sp.SetAttribute(attr)
|
||||
}
|
||||
ct.mtx.Lock()
|
||||
defer ct.mtx.Unlock()
|
||||
if ct.root == nil {
|
||||
ct.root = sp
|
||||
}
|
||||
ct.levels[name] = sp
|
||||
}
|
||||
|
||||
func (ct *clientTracer) close(name string) {
|
||||
ct.mtx.Lock()
|
||||
defer ct.mtx.Unlock()
|
||||
if s, ok := ct.levels[name]; ok {
|
||||
s.End()
|
||||
delete(ct.levels, name)
|
||||
if _, ok := ct.activeHooks[hook]; ok {
|
||||
// end was called before start is handled.
|
||||
sp.End()
|
||||
delete(ct.activeHooks, hook)
|
||||
} else {
|
||||
panic(fmt.Sprintf("failed to find span %s in levels.", name))
|
||||
ct.activeHooks[hook] = sp
|
||||
}
|
||||
}
|
||||
|
||||
func (ct *clientTracer) span(name string) trace.Span {
|
||||
func (ct *clientTracer) end(hook string, err error, attrs ...core.KeyValue) {
|
||||
ct.mtx.Lock()
|
||||
defer ct.mtx.Unlock()
|
||||
return ct.levels[name]
|
||||
if span, ok := ct.activeHooks[hook]; ok {
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Unknown)
|
||||
span.SetAttribute(MessageKey.String(err.Error()))
|
||||
}
|
||||
span.SetAttributes(attrs...)
|
||||
span.End()
|
||||
delete(ct.activeHooks, hook)
|
||||
} else {
|
||||
// start is not finished before end is called.
|
||||
ct.activeHooks[hook] = trace.NoopSpan{}
|
||||
}
|
||||
}
|
||||
|
||||
func (ct *clientTracer) span(hook string) trace.Span {
|
||||
ct.mtx.Lock()
|
||||
defer ct.mtx.Unlock()
|
||||
return ct.activeHooks[hook]
|
||||
}
|
||||
|
||||
func (ct *clientTracer) getConn(host string) {
|
||||
ct.open("http.getconn", HostKey.String(host))
|
||||
ct.start("http.getconn", "http.getconn", HostKey.String(host))
|
||||
}
|
||||
|
||||
func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) {
|
||||
ct.span("http.getconn").SetAttribute(HTTPRemoteAddr.String(info.Conn.RemoteAddr().String()))
|
||||
ct.span("http.getconn").SetAttribute(HTTPLocalAddr.String(info.Conn.LocalAddr().String()))
|
||||
|
||||
ct.close("http.getconn")
|
||||
ct.end("http.getconn",
|
||||
nil,
|
||||
HTTPRemoteAddr.String(info.Conn.RemoteAddr().String()),
|
||||
HTTPLocalAddr.String(info.Conn.LocalAddr().String()),
|
||||
)
|
||||
}
|
||||
|
||||
func (ct *clientTracer) putIdleConn(err error) {
|
||||
if err != nil {
|
||||
ct.span("http.receive").SetAttribute(MessageKey.String(err.Error()))
|
||||
ct.span("http.receive").SetStatus(codes.Unknown)
|
||||
}
|
||||
ct.close("http.receive")
|
||||
ct.end("http.receive", err)
|
||||
}
|
||||
|
||||
func (ct *clientTracer) gotFirstResponseByte() {
|
||||
ct.open("http.receive")
|
||||
ct.start("http.receive", "http.receive")
|
||||
}
|
||||
|
||||
func (ct *clientTracer) dnsStart(httptrace.DNSStartInfo) {
|
||||
ct.open("http.dns")
|
||||
func (ct *clientTracer) dnsStart(info httptrace.DNSStartInfo) {
|
||||
ct.start("http.dns", "http.dns", HostKey.String(info.Host))
|
||||
}
|
||||
|
||||
func (ct *clientTracer) dnsDone(httptrace.DNSDoneInfo) {
|
||||
ct.close("http.dns")
|
||||
func (ct *clientTracer) dnsDone(info httptrace.DNSDoneInfo) {
|
||||
ct.end("http.dns", info.Err)
|
||||
}
|
||||
|
||||
func (ct *clientTracer) connectStart(network, addr string) {
|
||||
ct.open("http.connect")
|
||||
ct.start("http.connect."+addr, "http.connect", HTTPRemoteAddr.String(addr))
|
||||
}
|
||||
|
||||
func (ct *clientTracer) connectDone(network, addr string, err error) {
|
||||
ct.close("http.connect")
|
||||
ct.end("http.connect."+addr, err)
|
||||
}
|
||||
|
||||
func (ct *clientTracer) tlsHandshakeStart() {
|
||||
ct.open("http.tls")
|
||||
ct.start("http.tls", "http.tls")
|
||||
}
|
||||
|
||||
func (ct *clientTracer) tlsHandshakeDone(tls.ConnectionState, error) {
|
||||
ct.close("http.tls")
|
||||
func (ct *clientTracer) tlsHandshakeDone(_ tls.ConnectionState, err error) {
|
||||
ct.end("http.tls", err)
|
||||
}
|
||||
|
||||
func (ct *clientTracer) wroteHeaderField(k string, v []string) {
|
||||
if ct.span("http.headers") == nil {
|
||||
ct.open("http.headers")
|
||||
ct.start("http.headers", "http.headers")
|
||||
}
|
||||
ct.root.SetAttribute(key.New("http." + strings.ToLower(k)).String(sa2s(v)))
|
||||
ct.root.SetAttribute(key.String("http."+strings.ToLower(k), sliceToString(v)))
|
||||
}
|
||||
|
||||
func (ct *clientTracer) wroteHeaders() {
|
||||
ct.open("http.send")
|
||||
ct.start("http.send", "http.send")
|
||||
}
|
||||
|
||||
func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) {
|
||||
@ -150,7 +180,7 @@ func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) {
|
||||
ct.root.SetAttribute(MessageKey.String(info.Err.Error()))
|
||||
ct.root.SetStatus(codes.Unknown)
|
||||
}
|
||||
ct.close("http.send")
|
||||
ct.end("http.send", info.Err)
|
||||
}
|
||||
|
||||
func (ct *clientTracer) got100Continue() {
|
||||
@ -169,10 +199,8 @@ func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func sa2s(value []string) string {
|
||||
if len(value) == 1 {
|
||||
return value[0]
|
||||
} else if len(value) == 0 {
|
||||
func sliceToString(value []string) string {
|
||||
if len(value) == 0 {
|
||||
return "undefined"
|
||||
}
|
||||
return strings.Join(value, ",")
|
||||
@ -186,7 +214,7 @@ func sm2s(value map[string][]string) string {
|
||||
}
|
||||
buf.WriteString(k)
|
||||
buf.WriteString("=")
|
||||
buf.WriteString(sa2s(v))
|
||||
buf.WriteString(sliceToString(v))
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
265
plugin/httptrace/clienttrace_test.go
Normal file
265
plugin/httptrace/clienttrace_test.go
Normal file
@ -0,0 +1,265 @@
|
||||
// Copyright 2019, OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package httptrace_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/key"
|
||||
"go.opentelemetry.io/otel/global"
|
||||
"go.opentelemetry.io/otel/plugin/httptrace"
|
||||
"go.opentelemetry.io/otel/sdk/export"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
)
|
||||
|
||||
type testExporter struct {
|
||||
mu sync.Mutex
|
||||
spanMap map[string][]*export.SpanData
|
||||
}
|
||||
|
||||
func (t *testExporter) ExportSpan(ctx context.Context, s *export.SpanData) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
var spans []*export.SpanData
|
||||
var ok bool
|
||||
|
||||
if spans, ok = t.spanMap[s.Name]; !ok {
|
||||
spans = []*export.SpanData{}
|
||||
t.spanMap[s.Name] = spans
|
||||
}
|
||||
spans = append(spans, s)
|
||||
t.spanMap[s.Name] = spans
|
||||
}
|
||||
|
||||
var _ export.SpanSyncer = (*testExporter)(nil)
|
||||
|
||||
func TestHTTPRequestWithClientTrace(t *testing.T) {
|
||||
exp := &testExporter{
|
||||
spanMap: make(map[string][]*export.SpanData),
|
||||
}
|
||||
tp, _ := sdktrace.NewProvider(sdktrace.WithSyncer(exp), sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}))
|
||||
global.SetTraceProvider(tp)
|
||||
|
||||
tr := tp.GetTracer("httptrace/client")
|
||||
|
||||
// Mock http server
|
||||
ts := httptest.NewServer(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
}),
|
||||
)
|
||||
defer ts.Close()
|
||||
address := ts.Listener.Addr()
|
||||
|
||||
client := ts.Client()
|
||||
err := tr.WithSpan(context.Background(), "test",
|
||||
func(ctx context.Context) error {
|
||||
req, _ := http.NewRequest("GET", ts.URL, nil)
|
||||
_, req = httptrace.W3C(ctx, req)
|
||||
|
||||
res, err := client.Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("Request failed: %s", err.Error())
|
||||
}
|
||||
_ = res.Body.Close()
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
panic("unexpected error in http request: " + err.Error())
|
||||
}
|
||||
|
||||
testLen := []struct {
|
||||
name string
|
||||
attributes []core.KeyValue
|
||||
}{
|
||||
{
|
||||
name: "go.opentelemetry.io/otel/plugin/httptrace/http.connect",
|
||||
attributes: []core.KeyValue{key.String("http.remote", address.String())},
|
||||
},
|
||||
{
|
||||
name: "go.opentelemetry.io/otel/plugin/httptrace/http.getconn",
|
||||
attributes: []core.KeyValue{
|
||||
key.String("http.remote", address.String()),
|
||||
key.String("http.host", address.String()),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "go.opentelemetry.io/otel/plugin/httptrace/http.receive",
|
||||
},
|
||||
{
|
||||
name: "go.opentelemetry.io/otel/plugin/httptrace/http.send",
|
||||
},
|
||||
{
|
||||
name: "httptrace/client/test",
|
||||
},
|
||||
}
|
||||
for _, tl := range testLen {
|
||||
spans, ok := exp.spanMap[tl.name]
|
||||
if !ok {
|
||||
t.Fatalf("no spans found with the name %s, %v", tl.name, exp.spanMap)
|
||||
}
|
||||
|
||||
if len(spans) != 1 {
|
||||
t.Fatalf("Expected exactly one span for %s but found %d", tl.name, len(spans))
|
||||
}
|
||||
span := spans[0]
|
||||
|
||||
actualAttrs := make(map[core.Key]string)
|
||||
for _, attr := range span.Attributes {
|
||||
actualAttrs[attr.Key] = attr.Value.Emit()
|
||||
}
|
||||
|
||||
expectedAttrs := make(map[core.Key]string)
|
||||
for _, attr := range tl.attributes {
|
||||
expectedAttrs[attr.Key] = attr.Value.Emit()
|
||||
}
|
||||
|
||||
if tl.name == "go.opentelemetry.io/otel/plugin/httptrace/http.getconn" {
|
||||
local := key.New("http.local")
|
||||
// http.local attribute is not deterministic, just make sure it exists for `getconn`.
|
||||
if _, ok := actualAttrs[local]; ok {
|
||||
delete(actualAttrs, local)
|
||||
} else {
|
||||
t.Fatalf("[span %s] is missing attribute %v", tl.name, local)
|
||||
}
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(actualAttrs, expectedAttrs); diff != "" {
|
||||
t.Fatalf("[span %s] Attributes are different: %v", tl.name, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentConnectionStart(t *testing.T) {
|
||||
exp := &testExporter{
|
||||
spanMap: make(map[string][]*export.SpanData),
|
||||
}
|
||||
tp, _ := sdktrace.NewProvider(sdktrace.WithSyncer(exp), sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}))
|
||||
global.SetTraceProvider(tp)
|
||||
|
||||
ct := httptrace.NewClientTrace(context.Background())
|
||||
|
||||
tts := []struct {
|
||||
name string
|
||||
run func()
|
||||
}{
|
||||
{
|
||||
name: "Open1Close1Open2Close2",
|
||||
run: func() {
|
||||
exp.spanMap = make(map[string][]*export.SpanData)
|
||||
|
||||
ct.ConnectStart("tcp", "127.0.0.1:3000")
|
||||
ct.ConnectDone("tcp", "127.0.0.1:3000", nil)
|
||||
ct.ConnectStart("tcp", "[::1]:3000")
|
||||
ct.ConnectDone("tcp", "[::1]:3000", nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Open2Close2Open1Close1",
|
||||
run: func() {
|
||||
exp.spanMap = make(map[string][]*export.SpanData)
|
||||
|
||||
ct.ConnectStart("tcp", "[::1]:3000")
|
||||
ct.ConnectDone("tcp", "[::1]:3000", nil)
|
||||
ct.ConnectStart("tcp", "127.0.0.1:3000")
|
||||
ct.ConnectDone("tcp", "127.0.0.1:3000", nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Open1Open2Close1Close2",
|
||||
run: func() {
|
||||
exp.spanMap = make(map[string][]*export.SpanData)
|
||||
|
||||
ct.ConnectStart("tcp", "127.0.0.1:3000")
|
||||
ct.ConnectStart("tcp", "[::1]:3000")
|
||||
ct.ConnectDone("tcp", "127.0.0.1:3000", nil)
|
||||
ct.ConnectDone("tcp", "[::1]:3000", nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Open1Open2Close2Close1",
|
||||
run: func() {
|
||||
exp.spanMap = make(map[string][]*export.SpanData)
|
||||
|
||||
ct.ConnectStart("tcp", "127.0.0.1:3000")
|
||||
ct.ConnectStart("tcp", "[::1]:3000")
|
||||
ct.ConnectDone("tcp", "[::1]:3000", nil)
|
||||
ct.ConnectDone("tcp", "127.0.0.1:3000", nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Open2Open1Close1Close2",
|
||||
run: func() {
|
||||
exp.spanMap = make(map[string][]*export.SpanData)
|
||||
|
||||
ct.ConnectStart("tcp", "[::1]:3000")
|
||||
ct.ConnectStart("tcp", "127.0.0.1:3000")
|
||||
ct.ConnectDone("tcp", "127.0.0.1:3000", nil)
|
||||
ct.ConnectDone("tcp", "[::1]:3000", nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Open2Open1Close2Close1",
|
||||
run: func() {
|
||||
exp.spanMap = make(map[string][]*export.SpanData)
|
||||
|
||||
ct.ConnectStart("tcp", "[::1]:3000")
|
||||
ct.ConnectStart("tcp", "127.0.0.1:3000")
|
||||
ct.ConnectDone("tcp", "[::1]:3000", nil)
|
||||
ct.ConnectDone("tcp", "127.0.0.1:3000", nil)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tts {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
tt.run()
|
||||
spans := exp.spanMap["go.opentelemetry.io/otel/plugin/httptrace/http.connect"]
|
||||
|
||||
if l := len(spans); l != 2 {
|
||||
t.Fatalf("Expected 2 'http.connect' traces but found %d", l)
|
||||
}
|
||||
|
||||
remotes := make(map[string]struct{})
|
||||
for _, span := range spans {
|
||||
if l := len(span.Attributes); l != 1 {
|
||||
t.Fatalf("Expected 1 attribute on each span but found %d", l)
|
||||
}
|
||||
|
||||
attr := span.Attributes[0]
|
||||
if attr.Key != "http.remote" {
|
||||
t.Fatalf("Expected attribute to be 'http.remote' but found %s", attr.Key)
|
||||
}
|
||||
remotes[attr.Value.Emit()] = struct{}{}
|
||||
}
|
||||
|
||||
if l := len(remotes); l != 2 {
|
||||
t.Fatalf("Expected 2 different 'http.remote' but found %d", l)
|
||||
}
|
||||
|
||||
for _, remote := range []string{"127.0.0.1:3000", "[::1]:3000"} {
|
||||
if _, ok := remotes[remote]; !ok {
|
||||
t.Fatalf("Missing remote %s", remote)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user