diff --git a/go.mod b/go.mod index 714f2934f..d21eb3e15 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/mattn/go-colorable v0.0.9 // indirect github.com/mattn/go-isatty v0.0.4 // indirect github.com/montanaflynn/stats v0.5.0 + github.com/openzipkin/zipkin-go v0.2.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v0.9.2 github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 // indirect diff --git a/go.sum b/go.sum index b0075cdbd..72b08b006 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef h1:ajsnF5qTstiBlP+V/mgh91zZfoKP477KfSmRoCoyYGU= github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= @@ -18,6 +20,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-farm v0.0.0-20190323231341-8198c7b169ec h1:sElGDs3V8VdCxH5tWi0ycWJzteOPLJ3HtItSSKI95PY= github.com/dgryski/go-farm v0.0.0-20190323231341-8198c7b169ec/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= @@ -47,6 +52,7 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/sync v0.0.0-20181108010431-42b317875d0f h1:vuwODIDRvDgwjIl6VTMf0c1Z9uVMUUxiu6UPUjiGhD4= github.com/golang/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:YCHYtYb9c8Q7XgYVYjmJBPtFPKx5QvOcPxHZWjldabE= github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:GJexUf2QgFNvMR9sjJ1iqs+2TxZqJko+Muhnu04tPuU= @@ -61,8 +67,11 @@ github.com/google/go-genproto v0.0.0-20180817151627-c66870c02cf8 h1:I9PuChzQA31g github.com/google/go-genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:3Rcd9jSoLVkV/osPrt5CogLvLiarfI8U9/x78NwhuDU= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/googleapis/google-cloud-go v0.26.0/go.mod h1:yJoOdPPE9UpqbamBhJvp7Ur6OUPPV4rUY3RnssPGNBA= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/grpc/grpc-go v1.20.1 h1:pk72GtSPpOdZDTkPneppDMGW10HYPC7RqNJT/JvUpV0= github.com/grpc/grpc-go v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -80,8 +89,15 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/montanaflynn/stats v0.5.0 h1:2EkzeTSqBB4V4bJwWrt5gIIrZmpJBcoIRGS2kWLgzmk= github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/openzipkin/zipkin-go v0.2.0 h1:33/f6xXB6YlOQ9tgTsXVOkdLCJsHTcZJnMy4DnSd6FU= +github.com/openzipkin/zipkin-go v0.2.0/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= +github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= @@ -92,6 +108,7 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jO github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 h1:YDeskXpkNDhPdWN3REluVa46HQOVuVkjkd2sWnrABNQ= github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -103,6 +120,7 @@ github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8 github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -112,16 +130,19 @@ github.com/tsuna/gohbase v0.0.0-20190201102810-d3184c1526df h1:jYiwqXfoRWU6pJMzC github.com/tsuna/gohbase v0.0.0-20190201102810-d3184c1526df/go.mod h1:3HfLQly3YNLGxNv/2YOfmz30vcjG9hbuME1GpxoLlGs= github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.26.0 h1:2NPPsBpD0ZoxshmLWewQru8rWmbT5JqSzz9D1ZrAjYQ= gopkg.in/go-playground/validator.v9 v9.26.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/naming/discovery/discovery.go b/pkg/naming/discovery/discovery.go index f47fd9584..9197362b5 100644 --- a/pkg/naming/discovery/discovery.go +++ b/pkg/naming/discovery/discovery.go @@ -609,6 +609,10 @@ func (d *Discovery) polls(ctx context.Context) (apps map[string]*naming.Instance func (d *Discovery) broadcast(apps map[string]*naming.InstancesInfo) { for appID, v := range apps { var count int + // v maybe nil in old version(less than v1.1) discovery,check incase of panic + if v==nil { + continue + } for zone, ins := range v.Instances { if len(ins) == 0 { delete(v.Instances, zone) diff --git a/pkg/net/http/blademaster/context.go b/pkg/net/http/blademaster/context.go index 10d051f79..d9957b19e 100644 --- a/pkg/net/http/blademaster/context.go +++ b/pkg/net/http/blademaster/context.go @@ -45,6 +45,11 @@ type Context struct { method string engine *Engine + + RoutePath string + + Params Params + } /************************************/ @@ -56,21 +61,13 @@ type Context struct { // See example in godoc. func (c *Context) Next() { c.index++ - s := int8(len(c.handlers)) - for ; c.index < s; c.index++ { - // only check method on last handler, otherwise middlewares - // will never be effected if request method is not matched - if c.index == s-1 && c.method != c.Request.Method { - code := http.StatusMethodNotAllowed - c.Error = ecode.MethodNotAllowed - http.Error(c.Writer, http.StatusText(code), code) - return - } - + for c.index < int8(len(c.handlers)) { c.handlers[c.index](c) + c.index++ } } + // Abort prevents pending handlers from being called. Note that this will not stop the current handler. // Let's say you have an authorization middleware that validates that the current request is authorized. // If the authorization fails (ex: the password does not match), call Abort to ensure the remaining handlers diff --git a/pkg/net/http/blademaster/routergroup.go b/pkg/net/http/blademaster/routergroup.go index 28d09a805..9859539fc 100644 --- a/pkg/net/http/blademaster/routergroup.go +++ b/pkg/net/http/blademaster/routergroup.go @@ -99,11 +99,6 @@ func (group *RouterGroup) Handle(httpMethod, relativePath string, handlers ...Ha return group.handle(httpMethod, relativePath, handlers...) } -// HEAD is a shortcut for router.Handle("HEAD", path, handle). -func (group *RouterGroup) HEAD(relativePath string, handlers ...HandlerFunc) IRoutes { - return group.handle("HEAD", relativePath, handlers...) -} - // GET is a shortcut for router.Handle("GET", path, handle). func (group *RouterGroup) GET(relativePath string, handlers ...HandlerFunc) IRoutes { return group.handle("GET", relativePath, handlers...) @@ -124,6 +119,22 @@ func (group *RouterGroup) DELETE(relativePath string, handlers ...HandlerFunc) I return group.handle("DELETE", relativePath, handlers...) } +// PATCH is a shortcut for router.Handle("PATCH", path, handle). +func (group *RouterGroup) PATCH(relativePath string, handlers ...HandlerFunc) IRoutes { + return group.handle("PATCH", relativePath, handlers...) +} + +// OPTIONS is a shortcut for router.Handle("OPTIONS", path, handle). +func (group *RouterGroup) OPTIONS(relativePath string, handlers ...HandlerFunc) IRoutes { + return group.handle("OPTIONS", relativePath, handlers...) +} + +// HEAD is a shortcut for router.Handle("HEAD", path, handle). +func (group *RouterGroup) HEAD(relativePath string, handlers ...HandlerFunc) IRoutes { + return group.handle("HEAD", relativePath, handlers...) +} + + func (group *RouterGroup) combineHandlers(handlerGroups ...[]HandlerFunc) []HandlerFunc { finalSize := len(group.Handlers) for _, handlers := range handlerGroups { @@ -164,3 +175,19 @@ func (group *RouterGroup) injections(relativePath string) []HandlerFunc { } return nil } + +// Any registers a route that matches all the HTTP methods. +// GET, POST, PUT, PATCH, HEAD, OPTIONS, DELETE, CONNECT, TRACE. +func (group *RouterGroup) Any(relativePath string, handlers ...HandlerFunc) IRoutes { + group.handle("GET", relativePath, handlers...) + group.handle("POST", relativePath, handlers...) + group.handle("PUT", relativePath, handlers...) + group.handle("PATCH", relativePath, handlers...) + group.handle("HEAD", relativePath, handlers...) + group.handle("OPTIONS", relativePath, handlers...) + group.handle("DELETE", relativePath, handlers...) + group.handle("CONNECT", relativePath, handlers...) + group.handle("TRACE", relativePath, handlers...) + return group.returnObj() +} + diff --git a/pkg/net/http/blademaster/server.go b/pkg/net/http/blademaster/server.go index 517f09d65..23e49f4b5 100644 --- a/pkg/net/http/blademaster/server.go +++ b/pkg/net/http/blademaster/server.go @@ -32,7 +32,10 @@ var ( _ IRouter = &Engine{} stats = stat.HTTPServer - _httpDSN string + _httpDSN string + default405Body = []byte("405 method not allowed") + default404Body = []byte("404 page not found") + ) func init() { @@ -123,7 +126,7 @@ type Engine struct { address string - mux *http.ServeMux // http mux router + trees methodTrees server atomic.Value // store *http.Server metastore map[string]map[string]interface{} // metastore is the path as key and the metadata of this path as value, it export via /metadata @@ -131,6 +134,28 @@ type Engine struct { methodConfigs map[string]*MethodConfig injections []injection + + // If enabled, the url.RawPath will be used to find parameters. + UseRawPath bool + + // If true, the path value will be unescaped. + // If UseRawPath is false (by default), the UnescapePathValues effectively is true, + // as url.Path gonna be used, which is already unescaped. + UnescapePathValues bool + + // If enabled, the router checks if another method is allowed for the + // current route, if the current request can not be routed. + // If this is the case, the request is answered with 'Method Not Allowed' + // and HTTP status code 405. + // If no other Method is allowed, the request is delegated to the NotFound + // handler. + HandleMethodNotAllowed bool + + allNoRoute []HandlerFunc + allNoMethod []HandlerFunc + noRoute []HandlerFunc + noMethod []HandlerFunc + } type injection struct { @@ -152,18 +177,28 @@ func NewServer(conf *ServerConfig) *Engine { basePath: "/", root: true, }, - address: ip.InternalIP(), - mux: http.NewServeMux(), - metastore: make(map[string]map[string]interface{}), - methodConfigs: make(map[string]*MethodConfig), - } - if err := engine.SetConfig(conf); err != nil { - panic(err) + conf: &ServerConfig{ + Timeout: xtime.Duration(time.Second), + }, + address: ip.InternalIP(), + trees: make(methodTrees, 0, 9), + metastore: make(map[string]map[string]interface{}), + methodConfigs: make(map[string]*MethodConfig), + HandleMethodNotAllowed: true, + injections: make([]injection, 0), } engine.RouterGroup.engine = engine // NOTE add prometheus monitor location engine.addRoute("GET", "/metrics", monitor()) engine.addRoute("GET", "/metadata", engine.metadata()) + engine.NoRoute(func(c *Context) { + c.Bytes(404, "text/plain", default404Body) + c.Abort() + }) + engine.NoMethod(func(c *Context) { + c.Bytes(405, "text/plain", []byte(http.StatusText(405))) + c.Abort() + }) startPerf() return engine } @@ -196,48 +231,62 @@ func (engine *Engine) addRoute(method, path string, handlers ...HandlerFunc) { engine.metastore[path] = make(map[string]interface{}) } engine.metastore[path]["method"] = method - engine.mux.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { - c := &Context{ - Context: nil, - engine: engine, - index: -1, - handlers: nil, - Keys: nil, - method: "", - Error: nil, - } + root := engine.trees.get(method) + if root == nil { + root = new(node) + engine.trees = append(engine.trees, methodTree{method: method, root: root}) + } - c.Request = req - c.Writer = w - c.handlers = handlers + prelude := func(c *Context) { c.method = method - - engine.handleContext(c) - }) + c.RoutePath = path + } + handlers = append([]HandlerFunc{prelude}, handlers...) + root.addRoute(path, handlers) } -// SetConfig is used to set the engine configuration. -// Only the valid config will be loaded. -func (engine *Engine) SetConfig(conf *ServerConfig) (err error) { - if conf.Timeout <= 0 { - return errors.New("blademaster: config timeout must greater than 0") +func (engine *Engine) prepareHandler(c *Context) { + httpMethod := c.Request.Method + rPath := c.Request.URL.Path + unescape := false + if engine.UseRawPath && len(c.Request.URL.RawPath) > 0 { + rPath = c.Request.URL.RawPath + unescape = engine.UnescapePathValues } - if conf.Network == "" { - conf.Network = "tcp" + rPath = cleanPath(rPath) + + // Find root of the tree for the given HTTP method + t := engine.trees + for i, tl := 0, len(t); i < tl; i++ { + if t[i].method != httpMethod { + continue + } + root := t[i].root + // Find route in tree + handlers, params, _ := root.getValue(rPath, c.Params, unescape) + if handlers != nil { + c.handlers = handlers + c.Params = params + return + } + break } - engine.lock.Lock() - engine.conf = conf - engine.lock.Unlock() + + if engine.HandleMethodNotAllowed { + for _, tree := range engine.trees { + if tree.method == httpMethod { + continue + } + if handlers, _, _ := tree.root.getValue(rPath, nil, unescape); handlers != nil { + c.handlers = engine.allNoMethod + return + } + } + } + c.handlers = engine.allNoRoute return } -func (engine *Engine) methodConfig(path string) *MethodConfig { - engine.pcLock.RLock() - mc := engine.methodConfigs[path] - engine.pcLock.RUnlock() - return mc -} - func (engine *Engine) handleContext(c *Context) { var cancel func() req := c.Request @@ -274,12 +323,35 @@ func (engine *Engine) handleContext(c *Context) { c.Context, cancel = context.WithCancel(ctx) } defer cancel() + engine.prepareHandler(c) c.Next() } +// SetConfig is used to set the engine configuration. +// Only the valid config will be loaded. +func (engine *Engine) SetConfig(conf *ServerConfig) (err error) { + if conf.Timeout <= 0 { + return errors.New("blademaster: config timeout must greater than 0") + } + if conf.Network == "" { + conf.Network = "tcp" + } + engine.lock.Lock() + engine.conf = conf + engine.lock.Unlock() + return +} + +func (engine *Engine) methodConfig(path string) *MethodConfig { + engine.pcLock.RLock() + mc := engine.methodConfigs[path] + engine.pcLock.RUnlock() + return mc +} + // Router return a http.Handler for using http.ListenAndServe() directly. func (engine *Engine) Router() http.Handler { - return engine.mux + return engine } // Server is used to load stored http server. @@ -305,6 +377,8 @@ func (engine *Engine) Shutdown(ctx context.Context) error { // For example, this is the right place for a logger or error management middleware. func (engine *Engine) UseFunc(middleware ...HandlerFunc) IRoutes { engine.RouterGroup.UseFunc(middleware...) + engine.rebuild404Handlers() + engine.rebuild405Handlers() return engine } @@ -333,7 +407,7 @@ func (engine *Engine) Run(addr ...string) (err error) { address := resolveAddress(addr) server := &http.Server{ Addr: address, - Handler: engine.mux, + Handler: engine, } engine.server.Store(server) if err = server.ListenAndServe(); err != nil { @@ -348,7 +422,7 @@ func (engine *Engine) Run(addr ...string) (err error) { func (engine *Engine) RunTLS(addr, certFile, keyFile string) (err error) { server := &http.Server{ Addr: addr, - Handler: engine.mux, + Handler: engine, } engine.server.Store(server) if err = server.ListenAndServeTLS(certFile, keyFile); err != nil { @@ -369,7 +443,7 @@ func (engine *Engine) RunUnix(file string) (err error) { } defer listener.Close() server := &http.Server{ - Handler: engine.mux, + Handler: engine, } engine.server.Store(server) if err = server.Serve(listener); err != nil { @@ -381,7 +455,7 @@ func (engine *Engine) RunUnix(file string) (err error) { // RunServer will serve and start listening HTTP requests by given server and listener. // Note: this method will block the calling goroutine indefinitely unless an error happens. func (engine *Engine) RunServer(server *http.Server, l net.Listener) (err error) { - server.Handler = engine.mux + server.Handler = engine engine.server.Store(server) if err = server.Serve(l); err != nil { err = errors.Wrapf(err, "listen server: %+v/%+v", server, l) @@ -403,3 +477,41 @@ func (engine *Engine) Inject(pattern string, handlers ...HandlerFunc) { handlers: handlers, }) } + +// ServeHTTP conforms to the http.Handler interface. +func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) { + c := &Context{ + Context: nil, + engine: engine, + index: -1, + handlers: nil, + Keys: nil, + method: "", + Error: nil, + } + + c.Request = req + c.Writer = w + + engine.handleContext(c) +} + +// NoRoute adds handlers for NoRoute. It return a 404 code by default. +func (engine *Engine) NoRoute(handlers ...HandlerFunc) { + engine.noRoute = handlers + engine.rebuild404Handlers() +} + +// NoMethod sets the handlers called when... TODO. +func (engine *Engine) NoMethod(handlers ...HandlerFunc) { + engine.noMethod = handlers + engine.rebuild405Handlers() +} + +func (engine *Engine) rebuild404Handlers() { + engine.allNoRoute = engine.combineHandlers(engine.noRoute) +} + +func (engine *Engine) rebuild405Handlers() { + engine.allNoMethod = engine.combineHandlers(engine.noMethod) +} diff --git a/pkg/net/http/blademaster/tree.go b/pkg/net/http/blademaster/tree.go new file mode 100644 index 000000000..eaeec92bd --- /dev/null +++ b/pkg/net/http/blademaster/tree.go @@ -0,0 +1,627 @@ +// Copyright 2013 Julien Schmidt. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be found +// at https://github.com/julienschmidt/httprouter/blob/master/LICENSE + +package blademaster + +import ( +"net/url" +"strings" +"unicode" +) + +// Param is a single URL parameter, consisting of a key and a value. +type Param struct { + Key string + Value string +} + +// Params is a Param-slice, as returned by the router. +// The slice is ordered, the first URL parameter is also the first slice value. +// It is therefore safe to read values by the index. +type Params []Param + +// Get returns the value of the first Param which key matches the given name. +// If no matching Param is found, an empty string is returned. +func (ps Params) Get(name string) (string, bool) { + for _, entry := range ps { + if entry.Key == name { + return entry.Value, true + } + } + return "", false +} + +// ByName returns the value of the first Param which key matches the given name. +// If no matching Param is found, an empty string is returned. +func (ps Params) ByName(name string) (va string) { + va, _ = ps.Get(name) + return +} + +type methodTree struct { + method string + root *node +} + +type methodTrees []methodTree + +func (trees methodTrees) get(method string) *node { + for _, tree := range trees { + if tree.method == method { + return tree.root + } + } + return nil +} + +func min(a, b int) int { + if a <= b { + return a + } + return b +} + +func countParams(path string) uint8 { + var n uint + for i := 0; i < len(path); i++ { + if path[i] != ':' && path[i] != '*' { + continue + } + n++ + } + if n >= 255 { + return 255 + } + return uint8(n) +} + +type nodeType uint8 + +const ( + static nodeType = iota // default + root + param + catchAll +) + +type node struct { + path string + indices string + children []*node + handlers []HandlerFunc + priority uint32 + nType nodeType + maxParams uint8 + wildChild bool +} + +// increments priority of the given child and reorders if necessary. +func (n *node) incrementChildPrio(pos int) int { + n.children[pos].priority++ + prio := n.children[pos].priority + + // adjust position (move to front) + newPos := pos + for newPos > 0 && n.children[newPos-1].priority < prio { + // swap node positions + n.children[newPos-1], n.children[newPos] = n.children[newPos], n.children[newPos-1] + + newPos-- + } + + // build new index char string + if newPos != pos { + n.indices = n.indices[:newPos] + // unchanged prefix, might be empty + n.indices[pos:pos+1] + // the index char we move + n.indices[newPos:pos] + n.indices[pos+1:] // rest without char at 'pos' + } + + return newPos +} + +// addRoute adds a node with the given handle to the path. +// Not concurrency-safe! +func (n *node) addRoute(path string, handlers []HandlerFunc) { + fullPath := path + n.priority++ + numParams := countParams(path) + + // non-empty tree + if len(n.path) > 0 || len(n.children) > 0 { + walk: + for { + // Update maxParams of the current node + if numParams > n.maxParams { + n.maxParams = numParams + } + + // Find the longest common prefix. + // This also implies that the common prefix contains no ':' or '*' + // since the existing key can't contain those chars. + i := 0 + max := min(len(path), len(n.path)) + for i < max && path[i] == n.path[i] { + i++ + } + + // Split edge + if i < len(n.path) { + child := node{ + path: n.path[i:], + wildChild: n.wildChild, + indices: n.indices, + children: n.children, + handlers: n.handlers, + priority: n.priority - 1, + } + + // Update maxParams (max of all children) + for i := range child.children { + if child.children[i].maxParams > child.maxParams { + child.maxParams = child.children[i].maxParams + } + } + + n.children = []*node{&child} + // []byte for proper unicode char conversion, see #65 + n.indices = string([]byte{n.path[i]}) + n.path = path[:i] + n.handlers = nil + n.wildChild = false + } + + // Make new node a child of this node + if i < len(path) { + path = path[i:] + + if n.wildChild { + n = n.children[0] + n.priority++ + + // Update maxParams of the child node + if numParams > n.maxParams { + n.maxParams = numParams + } + numParams-- + + // Check if the wildcard matches + if len(path) >= len(n.path) && n.path == path[:len(n.path)] { + // check for longer wildcard, e.g. :name and :names + if len(n.path) >= len(path) || path[len(n.path)] == '/' { + continue walk + } + } + + pathSeg := path + if n.nType != catchAll { + pathSeg = strings.SplitN(path, "/", 2)[0] + } + prefix := fullPath[:strings.Index(fullPath, pathSeg)] + n.path + panic("'" + pathSeg + + "' in new path '" + fullPath + + "' conflicts with existing wildcard '" + n.path + + "' in existing prefix '" + prefix + + "'") + } + + c := path[0] + + // slash after param + if n.nType == param && c == '/' && len(n.children) == 1 { + n = n.children[0] + n.priority++ + continue walk + } + + // Check if a child with the next path byte exists + for i := 0; i < len(n.indices); i++ { + if c == n.indices[i] { + i = n.incrementChildPrio(i) + n = n.children[i] + continue walk + } + } + + // Otherwise insert it + if c != ':' && c != '*' { + // []byte for proper unicode char conversion, see #65 + n.indices += string([]byte{c}) + child := &node{ + maxParams: numParams, + } + n.children = append(n.children, child) + n.incrementChildPrio(len(n.indices) - 1) + n = child + } + n.insertChild(numParams, path, fullPath, handlers) + return + + } else if i == len(path) { // Make node a (in-path) leaf + if n.handlers != nil { + panic("handlers are already registered for path '" + fullPath + "'") + } + n.handlers = handlers + } + return + } + } else { // Empty tree + n.insertChild(numParams, path, fullPath, handlers) + n.nType = root + } +} + +func (n *node) insertChild(numParams uint8, path string, fullPath string, handlers []HandlerFunc) { + var offset int // already handled bytes of the path + + // find prefix until first wildcard (beginning with ':' or '*') + for i, max := 0, len(path); numParams > 0; i++ { + c := path[i] + if c != ':' && c != '*' { + continue + } + + // find wildcard end (either '/' or path end) + end := i + 1 + for end < max && path[end] != '/' { + switch path[end] { + // the wildcard name must not contain ':' and '*' + case ':', '*': + panic("only one wildcard per path segment is allowed, has: '" + + path[i:] + "' in path '" + fullPath + "'") + default: + end++ + } + } + + // check if this Node existing children which would be + // unreachable if we insert the wildcard here + if len(n.children) > 0 { + panic("wildcard route '" + path[i:end] + + "' conflicts with existing children in path '" + fullPath + "'") + } + + // check if the wildcard has a name + if end-i < 2 { + panic("wildcards must be named with a non-empty name in path '" + fullPath + "'") + } + + if c == ':' { // param + // split path at the beginning of the wildcard + if i > 0 { + n.path = path[offset:i] + offset = i + } + + child := &node{ + nType: param, + maxParams: numParams, + } + n.children = []*node{child} + n.wildChild = true + n = child + n.priority++ + numParams-- + + // if the path doesn't end with the wildcard, then there + // will be another non-wildcard subpath starting with '/' + if end < max { + n.path = path[offset:end] + offset = end + + child := &node{ + maxParams: numParams, + priority: 1, + } + n.children = []*node{child} + n = child + } + + } else { // catchAll + if end != max || numParams > 1 { + panic("catch-all routes are only allowed at the end of the path in path '" + fullPath + "'") + } + + if len(n.path) > 0 && n.path[len(n.path)-1] == '/' { + panic("catch-all conflicts with existing handle for the path segment root in path '" + fullPath + "'") + } + + // currently fixed width 1 for '/' + i-- + if path[i] != '/' { + panic("no / before catch-all in path '" + fullPath + "'") + } + + n.path = path[offset:i] + + // first node: catchAll node with empty path + child := &node{ + wildChild: true, + nType: catchAll, + maxParams: 1, + } + n.children = []*node{child} + n.indices = string(path[i]) + n = child + n.priority++ + + // second node: node holding the variable + child = &node{ + path: path[i:], + nType: catchAll, + maxParams: 1, + handlers: handlers, + priority: 1, + } + n.children = []*node{child} + + return + } + } + + // insert remaining path part and handle to the leaf + n.path = path[offset:] + n.handlers = handlers +} + +// getValue returns the handle registered with the given path (key). The values of +// wildcards are saved to a map. +// If no handle can be found, a TSR (trailing slash redirect) recommendation is +// made if a handle exists with an extra (without the) trailing slash for the +// given path. +func (n *node) getValue(path string, po Params, unescape bool) (handlers []HandlerFunc, p Params, tsr bool) { + p = po +walk: // Outer loop for walking the tree + for { + if len(path) > len(n.path) { + if path[:len(n.path)] == n.path { + path = path[len(n.path):] + // If this node does not have a wildcard (param or catchAll) + // child, we can just look up the next child node and continue + // to walk down the tree + if !n.wildChild { + c := path[0] + for i := 0; i < len(n.indices); i++ { + if c == n.indices[i] { + n = n.children[i] + continue walk + } + } + + // Nothing found. + // We can recommend to redirect to the same URL without a + // trailing slash if a leaf exists for that path. + tsr = path == "/" && n.handlers != nil + return + } + + // handle wildcard child + n = n.children[0] + switch n.nType { + case param: + // find param end (either '/' or path end) + end := 0 + for end < len(path) && path[end] != '/' { + end++ + } + + // save param value + if cap(p) < int(n.maxParams) { + p = make(Params, 0, n.maxParams) + } + i := len(p) + p = p[:i+1] // expand slice within preallocated capacity + p[i].Key = n.path[1:] + val := path[:end] + if unescape { + var err error + if p[i].Value, err = url.QueryUnescape(val); err != nil { + p[i].Value = val // fallback, in case of error + } + } else { + p[i].Value = val + } + + // we need to go deeper! + if end < len(path) { + if len(n.children) > 0 { + path = path[end:] + n = n.children[0] + continue walk + } + + // ... but we can't + tsr = len(path) == end+1 + return + } + + if handlers = n.handlers; handlers != nil { + return + } + if len(n.children) == 1 { + // No handle found. Check if a handle for this path + a + // trailing slash exists for TSR recommendation + n = n.children[0] + tsr = n.path == "/" && n.handlers != nil + } + + return + + case catchAll: + // save param value + if cap(p) < int(n.maxParams) { + p = make(Params, 0, n.maxParams) + } + i := len(p) + p = p[:i+1] // expand slice within preallocated capacity + p[i].Key = n.path[2:] + if unescape { + var err error + if p[i].Value, err = url.QueryUnescape(path); err != nil { + p[i].Value = path // fallback, in case of error + } + } else { + p[i].Value = path + } + + handlers = n.handlers + return + + default: + panic("invalid node type") + } + } + } else if path == n.path { + // We should have reached the node containing the handle. + // Check if this node has a handle registered. + if handlers = n.handlers; handlers != nil { + return + } + + if path == "/" && n.wildChild && n.nType != root { + tsr = true + return + } + + // No handle found. Check if a handle for this path + a + // trailing slash exists for trailing slash recommendation + for i := 0; i < len(n.indices); i++ { + if n.indices[i] == '/' { + n = n.children[i] + tsr = (len(n.path) == 1 && n.handlers != nil) || + (n.nType == catchAll && n.children[0].handlers != nil) + return + } + } + + return + } + + // Nothing found. We can recommend to redirect to the same URL with an + // extra trailing slash if a leaf exists for that path + tsr = (path == "/") || + (len(n.path) == len(path)+1 && n.path[len(path)] == '/' && + path == n.path[:len(n.path)-1] && n.handlers != nil) + return + } +} + +// findCaseInsensitivePath makes a case-insensitive lookup of the given path and tries to find a handler. +// It can optionally also fix trailing slashes. +// It returns the case-corrected path and a bool indicating whether the lookup +// was successful. +func (n *node) findCaseInsensitivePath(path string, fixTrailingSlash bool) (ciPath []byte, found bool) { + ciPath = make([]byte, 0, len(path)+1) // preallocate enough memory + + // Outer loop for walking the tree + for len(path) >= len(n.path) && strings.ToLower(path[:len(n.path)]) == strings.ToLower(n.path) { + path = path[len(n.path):] + ciPath = append(ciPath, n.path...) + + if len(path) > 0 { + // If this node does not have a wildcard (param or catchAll) child, + // we can just look up the next child node and continue to walk down + // the tree + if !n.wildChild { + r := unicode.ToLower(rune(path[0])) + for i, index := range n.indices { + // must use recursive approach since both index and + // ToLower(index) could exist. We must check both. + if r == unicode.ToLower(index) { + out, found := n.children[i].findCaseInsensitivePath(path, fixTrailingSlash) + if found { + return append(ciPath, out...), true + } + } + } + + // Nothing found. We can recommend to redirect to the same URL + // without a trailing slash if a leaf exists for that path + found = fixTrailingSlash && path == "/" && n.handlers != nil + return + } + + n = n.children[0] + switch n.nType { + case param: + // find param end (either '/' or path end) + k := 0 + for k < len(path) && path[k] != '/' { + k++ + } + + // add param value to case insensitive path + ciPath = append(ciPath, path[:k]...) + + // we need to go deeper! + if k < len(path) { + if len(n.children) > 0 { + path = path[k:] + n = n.children[0] + continue + } + + // ... but we can't + if fixTrailingSlash && len(path) == k+1 { + return ciPath, true + } + return + } + + if n.handlers != nil { + return ciPath, true + } else if fixTrailingSlash && len(n.children) == 1 { + // No handle found. Check if a handle for this path + a + // trailing slash exists + n = n.children[0] + if n.path == "/" && n.handlers != nil { + return append(ciPath, '/'), true + } + } + return + + case catchAll: + return append(ciPath, path...), true + + default: + panic("invalid node type") + } + } else { + // We should have reached the node containing the handle. + // Check if this node has a handle registered. + if n.handlers != nil { + return ciPath, true + } + + // No handle found. + // Try to fix the path by adding a trailing slash + if fixTrailingSlash { + for i := 0; i < len(n.indices); i++ { + if n.indices[i] == '/' { + n = n.children[i] + if (len(n.path) == 1 && n.handlers != nil) || + (n.nType == catchAll && n.children[0].handlers != nil) { + return append(ciPath, '/'), true + } + return + } + } + } + return + } + } + + // Nothing found. + // Try to fix the path by adding / removing a trailing slash + if fixTrailingSlash { + if path == "/" { + return ciPath, true + } + if len(path)+1 == len(n.path) && n.path[len(path)] == '/' && + strings.ToLower(path) == strings.ToLower(n.path[:len(path)]) && + n.handlers != nil { + return append(ciPath, n.path...), true + } + } + return +} diff --git a/pkg/net/http/blademaster/utils.go b/pkg/net/http/blademaster/utils.go index 99ee1fdf3..7bd86034f 100644 --- a/pkg/net/http/blademaster/utils.go +++ b/pkg/net/http/blademaster/utils.go @@ -29,8 +29,10 @@ func resolveAddress(addr []string) string { switch len(addr) { case 0: if port := os.Getenv("PORT"); port != "" { + //debugPrint("Environment variable PORT=\"%s\"", port) return ":" + port } + //debugPrint("Environment variable PORT is undefined. Using port :8080 by default") return ":8080" case 1: return addr[0] @@ -38,3 +40,120 @@ func resolveAddress(addr []string) string { panic("too much parameters") } } + +// cleanPath is the URL version of path.Clean, it returns a canonical URL path +// for p, eliminating . and .. elements. +// +// The following rules are applied iteratively until no further processing can +// be done: +// 1. Replace multiple slashes with a single slash. +// 2. Eliminate each . path name element (the current directory). +// 3. Eliminate each inner .. path name element (the parent directory) +// along with the non-.. element that precedes it. +// 4. Eliminate .. elements that begin a rooted path: +// that is, replace "/.." by "/" at the beginning of a path. +// +// If the result of this process is an empty string, "/" is returned. +func cleanPath(p string) string { + // Turn empty string into "/" + if p == "" { + return "/" + } + + n := len(p) + var buf []byte + + // Invariants: + // reading from path; r is index of next byte to process. + // writing to buf; w is index of next byte to write. + + // path must start with '/' + r := 1 + w := 1 + + if p[0] != '/' { + r = 0 + buf = make([]byte, n+1) + buf[0] = '/' + } + + trailing := n > 1 && p[n-1] == '/' + + // A bit more clunky without a 'lazybuf' like the path package, but the loop + // gets completely inlined (bufApp). So in contrast to the path package this + // loop has no expensive function calls (except 1x make) + + for r < n { + switch { + case p[r] == '/': + // empty path element, trailing slash is added after the end + r++ + + case p[r] == '.' && r+1 == n: + trailing = true + r++ + + case p[r] == '.' && p[r+1] == '/': + // . element + r += 2 + + case p[r] == '.' && p[r+1] == '.' && (r+2 == n || p[r+2] == '/'): + // .. element: remove to last / + r += 3 + + if w > 1 { + // can backtrack + w-- + + if buf == nil { + for w > 1 && p[w] != '/' { + w-- + } + } else { + for w > 1 && buf[w] != '/' { + w-- + } + } + } + + default: + // real path element. + // add slash if needed + if w > 1 { + bufApp(&buf, p, w, '/') + w++ + } + + // copy element + for r < n && p[r] != '/' { + bufApp(&buf, p, w, p[r]) + w++ + r++ + } + } + } + + // re-append trailing slash + if trailing && w > 1 { + bufApp(&buf, p, w, '/') + w++ + } + + if buf == nil { + return p[:w] + } + return string(buf[:w]) +} + +// internal helper to lazily create a buffer if necessary. +func bufApp(buf *[]byte, s string, w int, c byte) { + if *buf == nil { + if s[w] == c { + return + } + + *buf = make([]byte, len(s)) + copy(*buf, s[:w]) + } + (*buf)[w] = c +} diff --git a/pkg/net/trace/config.go b/pkg/net/trace/config.go index c43e09e71..7741dc147 100644 --- a/pkg/net/trace/config.go +++ b/pkg/net/trace/config.go @@ -58,7 +58,7 @@ func TracerFromEnvFlag() (Tracer, error) { return nil, err } report := newReport(cfg.Network, cfg.Addr, time.Duration(cfg.Timeout), cfg.ProtocolVersion) - return newTracer(env.AppID, report, cfg), nil + return NewTracer(env.AppID, report, cfg.DisableSample), nil } // Init init trace report. @@ -71,5 +71,5 @@ func Init(cfg *Config) { } } report := newReport(cfg.Network, cfg.Addr, time.Duration(cfg.Timeout), cfg.ProtocolVersion) - SetGlobalTracer(newTracer(env.AppID, report, cfg)) + SetGlobalTracer(NewTracer(env.AppID, report, cfg.DisableSample)) } diff --git a/pkg/net/trace/context.go b/pkg/net/trace/context.go index e3d20bd94..1aab864c9 100644 --- a/pkg/net/trace/context.go +++ b/pkg/net/trace/context.go @@ -19,39 +19,39 @@ var ( // SpanContext implements opentracing.SpanContext type spanContext struct { - // traceID represents globally unique ID of the trace. + // TraceID represents globally unique ID of the trace. // Usually generated as a random number. - traceID uint64 + TraceID uint64 - // spanID represents span ID that must be unique within its trace, + // SpanID represents span ID that must be unique within its trace, // but does not have to be globally unique. - spanID uint64 + SpanID uint64 - // parentID refers to the ID of the parent span. + // ParentID refers to the ID of the parent span. // Should be 0 if the current span is a root span. - parentID uint64 + ParentID uint64 - // flags is a bitmap containing such bits as 'sampled' and 'debug'. - flags byte + // Flags is a bitmap containing such bits as 'sampled' and 'debug'. + Flags byte - // probability - probability float32 + // Probability + Probability float32 - // current level - level int + // Level current level + Level int } func (c spanContext) isSampled() bool { - return (c.flags & flagSampled) == flagSampled + return (c.Flags & flagSampled) == flagSampled } func (c spanContext) isDebug() bool { - return (c.flags & flagDebug) == flagDebug + return (c.Flags & flagDebug) == flagDebug } // IsValid check spanContext valid func (c spanContext) IsValid() bool { - return c.traceID != 0 && c.spanID != 0 + return c.TraceID != 0 && c.SpanID != 0 } // emptyContext emptyContext @@ -69,10 +69,10 @@ var emptyContext = spanContext{} // sample-rate: s-{base16(BigEndian(float32))} func (c spanContext) String() string { base := make([]string, 4) - base[0] = strconv.FormatUint(uint64(c.traceID), 16) - base[1] = strconv.FormatUint(uint64(c.spanID), 16) - base[2] = strconv.FormatUint(uint64(c.parentID), 16) - base[3] = strconv.FormatUint(uint64(c.flags), 16) + base[0] = strconv.FormatUint(uint64(c.TraceID), 16) + base[1] = strconv.FormatUint(uint64(c.SpanID), 16) + base[2] = strconv.FormatUint(uint64(c.ParentID), 16) + base[3] = strconv.FormatUint(uint64(c.Flags), 16) return strings.Join(base, ":") } @@ -101,10 +101,10 @@ func contextFromString(value string) (spanContext, error) { return emptyContext, errInvalidTracerString } sctx := spanContext{ - traceID: rets[0], - spanID: rets[1], - parentID: rets[2], - flags: byte(rets[3]), + TraceID: rets[0], + SpanID: rets[1], + ParentID: rets[2], + Flags: byte(rets[3]), } return sctx, nil } diff --git a/pkg/net/trace/context_test.go b/pkg/net/trace/context_test.go index d2d85baf9..fb5e02f92 100644 --- a/pkg/net/trace/context_test.go +++ b/pkg/net/trace/context_test.go @@ -6,10 +6,10 @@ import ( func TestSpanContext(t *testing.T) { pctx := &spanContext{ - parentID: genID(), - spanID: genID(), - traceID: genID(), - flags: flagSampled, + ParentID: genID(), + SpanID: genID(), + TraceID: genID(), + Flags: flagSampled, } if !pctx.isSampled() { t.Error("expect sampled") @@ -20,7 +20,7 @@ func TestSpanContext(t *testing.T) { if err != nil { t.Error(err) } - if pctx2.parentID != pctx.parentID || pctx2.spanID != pctx.spanID || pctx2.traceID != pctx.traceID || pctx2.flags != pctx.flags { + if pctx2.ParentID != pctx.ParentID || pctx2.SpanID != pctx.SpanID || pctx2.TraceID != pctx.TraceID || pctx2.Flags != pctx.Flags { t.Errorf("wrong spancontext get %+v -> %+v", pctx, pctx2) } } diff --git a/pkg/net/trace/dapper.go b/pkg/net/trace/dapper.go index 5a1675ddf..88934b103 100644 --- a/pkg/net/trace/dapper.go +++ b/pkg/net/trace/dapper.go @@ -8,21 +8,21 @@ import ( ) const ( - _maxLevel = 64 + _maxLevel = 64 + // hard code reset probability at 0.00025, 1/4000 _probability = 0.00025 ) -func newTracer(serviceName string, report reporter, cfg *Config) Tracer { - // hard code reset probability at 0.00025, 1/4000 - cfg.Probability = _probability - sampler := newSampler(cfg.Probability) +// NewTracer new a tracer. +func NewTracer(serviceName string, report reporter, disableSample bool) Tracer { + sampler := newSampler(_probability) // default internal tags tags := extendTag() stdlog := log.New(os.Stderr, "trace", log.LstdFlags) return &dapper{ - cfg: cfg, - serviceName: serviceName, + serviceName: serviceName, + disableSample: disableSample, propagators: map[interface{}]propagator{ HTTPFormat: httpPropagator{}, GRPCFormat: grpcPropagator{}, @@ -30,20 +30,20 @@ func newTracer(serviceName string, report reporter, cfg *Config) Tracer { reporter: report, sampler: sampler, tags: tags, - pool: &sync.Pool{New: func() interface{} { return new(span) }}, + pool: &sync.Pool{New: func() interface{} { return new(Span) }}, stdlog: stdlog, } } type dapper struct { - cfg *Config - serviceName string - tags []Tag - reporter reporter - propagators map[interface{}]propagator - pool *sync.Pool - stdlog *log.Logger - sampler sampler + serviceName string + disableSample bool + tags []Tag + reporter reporter + propagators map[interface{}]propagator + pool *sync.Pool + stdlog *log.Logger + sampler sampler } func (d *dapper) New(operationName string, opts ...Option) Trace { @@ -54,19 +54,19 @@ func (d *dapper) New(operationName string, opts ...Option) Trace { traceID := genID() var sampled bool var probability float32 - if d.cfg.DisableSample { + if d.disableSample { sampled = true probability = 1 } else { sampled, probability = d.sampler.IsSampled(traceID, operationName) } - pctx := spanContext{traceID: traceID} + pctx := spanContext{TraceID: traceID} if sampled { - pctx.flags = flagSampled - pctx.probability = probability + pctx.Flags = flagSampled + pctx.Probability = probability } if opt.Debug { - pctx.flags |= flagDebug + pctx.Flags |= flagDebug return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server")).SetTag(TagBool("debug", true)) } // 为了兼容临时为 New 的 Span 设置 span.kind @@ -80,21 +80,21 @@ func (d *dapper) newSpanWithContext(operationName string, pctx spanContext) Trac // sp.context = pctx // return sp //} - if pctx.level > _maxLevel { + if pctx.Level > _maxLevel { // if span reach max limit level return noopspan return noopspan{} } - level := pctx.level + 1 + level := pctx.Level + 1 nctx := spanContext{ - traceID: pctx.traceID, - parentID: pctx.spanID, - flags: pctx.flags, - level: level, + TraceID: pctx.TraceID, + ParentID: pctx.SpanID, + Flags: pctx.Flags, + Level: level, } - if pctx.spanID == 0 { - nctx.spanID = pctx.traceID + if pctx.SpanID == 0 { + nctx.SpanID = pctx.TraceID } else { - nctx.spanID = genID() + nctx.SpanID = genID() } sp.operationName = operationName sp.context = nctx @@ -160,7 +160,7 @@ func (d *dapper) Close() error { return d.reporter.Close() } -func (d *dapper) report(sp *span) { +func (d *dapper) report(sp *Span) { if sp.context.isSampled() { if err := d.reporter.WriteSpan(sp); err != nil { d.stdlog.Printf("marshal trace span error: %s", err) @@ -169,7 +169,7 @@ func (d *dapper) report(sp *span) { d.putSpan(sp) } -func (d *dapper) putSpan(sp *span) { +func (d *dapper) putSpan(sp *Span) { if len(sp.tags) > 32 { sp.tags = nil } @@ -179,8 +179,8 @@ func (d *dapper) putSpan(sp *span) { d.pool.Put(sp) } -func (d *dapper) getSpan() *span { - sp := d.pool.Get().(*span) +func (d *dapper) getSpan() *Span { + sp := d.pool.Get().(*Span) sp.dapper = d sp.childs = 0 sp.tags = sp.tags[:0] diff --git a/pkg/net/trace/dapper_test.go b/pkg/net/trace/dapper_test.go index f5e3ab7da..0ee9cbbf0 100644 --- a/pkg/net/trace/dapper_test.go +++ b/pkg/net/trace/dapper_test.go @@ -10,10 +10,10 @@ import ( ) type mockReport struct { - sps []*span + sps []*Span } -func (m *mockReport) WriteSpan(sp *span) error { +func (m *mockReport) WriteSpan(sp *Span) error { m.sps = append(m.sps, sp) return nil } @@ -25,8 +25,8 @@ func (m *mockReport) Close() error { func TestDapperPropagation(t *testing.T) { t.Run("test HTTP progagation", func(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) - t2 := newTracer("service2", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) + t2 := NewTracer("service2", report, true) sp1 := t1.New("opt_1") sp2 := sp1.Fork("", "opt_client") header := make(http.Header) @@ -40,17 +40,17 @@ func TestDapperPropagation(t *testing.T) { sp1.Finish(nil) assert.Len(t, report.sps, 3) - assert.Equal(t, report.sps[2].context.parentID, uint64(0)) - assert.Equal(t, report.sps[0].context.traceID, report.sps[1].context.traceID) - assert.Equal(t, report.sps[2].context.traceID, report.sps[1].context.traceID) + assert.Equal(t, report.sps[2].context.ParentID, uint64(0)) + assert.Equal(t, report.sps[0].context.TraceID, report.sps[1].context.TraceID) + assert.Equal(t, report.sps[2].context.TraceID, report.sps[1].context.TraceID) - assert.Equal(t, report.sps[1].context.parentID, report.sps[2].context.spanID) - assert.Equal(t, report.sps[0].context.parentID, report.sps[1].context.spanID) + assert.Equal(t, report.sps[1].context.ParentID, report.sps[2].context.SpanID) + assert.Equal(t, report.sps[0].context.ParentID, report.sps[1].context.SpanID) }) t.Run("test gRPC progagation", func(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) - t2 := newTracer("service2", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) + t2 := NewTracer("service2", report, true) sp1 := t1.New("opt_1") sp2 := sp1.Fork("", "opt_client") md := make(metadata.MD) @@ -64,23 +64,23 @@ func TestDapperPropagation(t *testing.T) { sp1.Finish(nil) assert.Len(t, report.sps, 3) - assert.Equal(t, report.sps[2].context.parentID, uint64(0)) - assert.Equal(t, report.sps[0].context.traceID, report.sps[1].context.traceID) - assert.Equal(t, report.sps[2].context.traceID, report.sps[1].context.traceID) + assert.Equal(t, report.sps[2].context.ParentID, uint64(0)) + assert.Equal(t, report.sps[0].context.TraceID, report.sps[1].context.TraceID) + assert.Equal(t, report.sps[2].context.TraceID, report.sps[1].context.TraceID) - assert.Equal(t, report.sps[1].context.parentID, report.sps[2].context.spanID) - assert.Equal(t, report.sps[0].context.parentID, report.sps[1].context.spanID) + assert.Equal(t, report.sps[1].context.ParentID, report.sps[2].context.SpanID) + assert.Equal(t, report.sps[0].context.ParentID, report.sps[1].context.SpanID) }) t.Run("test normal", func(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{Probability: 0.000000001}) + t1 := NewTracer("service1", report, true) sp1 := t1.New("test123") sp1.Finish(nil) }) t.Run("test debug progagation", func(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{}) - t2 := newTracer("service2", report, &Config{}) + t1 := NewTracer("service1", report, true) + t2 := NewTracer("service2", report, true) sp1 := t1.New("opt_1", EnableDebug()) sp2 := sp1.Fork("", "opt_client") header := make(http.Header) @@ -94,19 +94,19 @@ func TestDapperPropagation(t *testing.T) { sp1.Finish(nil) assert.Len(t, report.sps, 3) - assert.Equal(t, report.sps[2].context.parentID, uint64(0)) - assert.Equal(t, report.sps[0].context.traceID, report.sps[1].context.traceID) - assert.Equal(t, report.sps[2].context.traceID, report.sps[1].context.traceID) + assert.Equal(t, report.sps[2].context.ParentID, uint64(0)) + assert.Equal(t, report.sps[0].context.TraceID, report.sps[1].context.TraceID) + assert.Equal(t, report.sps[2].context.TraceID, report.sps[1].context.TraceID) - assert.Equal(t, report.sps[1].context.parentID, report.sps[2].context.spanID) - assert.Equal(t, report.sps[0].context.parentID, report.sps[1].context.spanID) + assert.Equal(t, report.sps[1].context.ParentID, report.sps[2].context.SpanID) + assert.Equal(t, report.sps[0].context.ParentID, report.sps[1].context.SpanID) }) } func BenchmarkSample(b *testing.B) { err := fmt.Errorf("test error") report := &mockReport{} - t1 := newTracer("service1", report, &Config{}) + t1 := NewTracer("service1", report, true) for i := 0; i < b.N; i++ { sp1 := t1.New("test_opt1") sp1.SetTag(TagString("test", "123")) @@ -122,7 +122,7 @@ func BenchmarkSample(b *testing.B) { func BenchmarkDisableSample(b *testing.B) { err := fmt.Errorf("test error") report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) for i := 0; i < b.N; i++ { sp1 := t1.New("test_opt1") sp1.SetTag(TagString("test", "123")) diff --git a/pkg/net/trace/marshal.go b/pkg/net/trace/marshal.go index 6324694e0..5553bdf84 100644 --- a/pkg/net/trace/marshal.go +++ b/pkg/net/trace/marshal.go @@ -20,22 +20,22 @@ var ( errSpanVersion = errs.New("trace: marshal not support version") ) -func marshalSpan(sp *span, version int32) ([]byte, error) { +func marshalSpan(sp *Span, version int32) ([]byte, error) { if version == protoVersion1 { return marshalSpanV1(sp) } return nil, errSpanVersion } -func marshalSpanV1(sp *span) ([]byte, error) { +func marshalSpanV1(sp *Span) ([]byte, error) { protoSpan := new(protogen.Span) protoSpan.Version = protoVersion1 protoSpan.ServiceName = sp.dapper.serviceName protoSpan.OperationName = sp.operationName - protoSpan.TraceId = sp.context.traceID - protoSpan.SpanId = sp.context.spanID - protoSpan.ParentId = sp.context.parentID - protoSpan.SamplingProbability = sp.context.probability + protoSpan.TraceId = sp.context.TraceID + protoSpan.SpanId = sp.context.SpanID + protoSpan.ParentId = sp.context.ParentID + protoSpan.SamplingProbability = sp.context.Probability protoSpan.StartTime = ×tamp.Timestamp{ Seconds: sp.startTime.Unix(), Nanos: int32(sp.startTime.Nanosecond()), diff --git a/pkg/net/trace/marshal_test.go b/pkg/net/trace/marshal_test.go index 30fe50cc9..3c4d5831d 100644 --- a/pkg/net/trace/marshal_test.go +++ b/pkg/net/trace/marshal_test.go @@ -6,8 +6,8 @@ import ( func TestMarshalSpanV1(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) - sp1 := t1.New("opt_test").(*span) + t1 := NewTracer("service1", report, true) + sp1 := t1.New("opt_test").(*Span) sp1.SetLog(Log("hello", "test123")) sp1.SetTag(TagString("tag1", "hell"), TagBool("booltag", true), TagFloat64("float64tag", 3.14159)) sp1.Finish(nil) diff --git a/pkg/net/trace/report.go b/pkg/net/trace/report.go index e9b8e4122..6bce7ea28 100644 --- a/pkg/net/trace/report.go +++ b/pkg/net/trace/report.go @@ -20,7 +20,7 @@ const ( // reporter trace reporter. type reporter interface { - WriteSpan(sp *span) error + WriteSpan(sp *Span) error Close() error } @@ -64,7 +64,7 @@ func (c *connReport) daemon() { c.done <- struct{}{} } -func (c *connReport) WriteSpan(sp *span) error { +func (c *connReport) WriteSpan(sp *Span) error { data, err := marshalSpan(sp, c.version) if err != nil { return err diff --git a/pkg/net/trace/span.go b/pkg/net/trace/span.go index 74d9eb756..635785f69 100644 --- a/pkg/net/trace/span.go +++ b/pkg/net/trace/span.go @@ -13,9 +13,10 @@ const ( _maxLogs = 256 ) -var _ Trace = &span{} +var _ Trace = &Span{} -type span struct { +// Span is a trace span. +type Span struct { dapper *dapper context spanContext operationName string @@ -26,11 +27,35 @@ type span struct { childs int } -func (s *span) TraceID() string { +func (s *Span) Name() string { + return s.operationName +} + +func (s *Span) StartTime() time.Time { + return s.startTime +} + +func (s *Span) Duration() time.Duration { + return s.duration +} + +func (s *Span) TraceID() string { return s.context.String() } -func (s *span) Fork(serviceName, operationName string) Trace { +func (s *Span) Context() spanContext { + return s.context +} + +func (s *Span) Tags() []Tag { + return s.tags +} + +func (s *Span) Logs() []*protogen.Log { + return s.logs +} + +func (s *Span) Fork(serviceName, operationName string) Trace { if s.childs > _maxChilds { // if child span more than max childs set return noopspan return noopspan{} @@ -40,11 +65,11 @@ func (s *span) Fork(serviceName, operationName string) Trace { return s.dapper.newSpanWithContext(operationName, s.context).SetTag(TagString(TagSpanKind, "client")) } -func (s *span) Follow(serviceName, operationName string) Trace { +func (s *Span) Follow(serviceName, operationName string) Trace { return s.Fork(serviceName, operationName).SetTag(TagString(TagSpanKind, "producer")) } -func (s *span) Finish(perr *error) { +func (s *Span) Finish(perr *error) { s.duration = time.Since(s.startTime) if perr != nil && *perr != nil { err := *perr @@ -57,7 +82,7 @@ func (s *span) Finish(perr *error) { s.dapper.report(s) } -func (s *span) SetTag(tags ...Tag) Trace { +func (s *Span) SetTag(tags ...Tag) Trace { if !s.context.isSampled() && !s.context.isDebug() { return s } @@ -72,7 +97,7 @@ func (s *span) SetTag(tags ...Tag) Trace { // LogFields is an efficient and type-checked way to record key:value // NOTE current unsupport -func (s *span) SetLog(logs ...LogField) Trace { +func (s *Span) SetLog(logs ...LogField) Trace { if !s.context.isSampled() && !s.context.isDebug() { return s } @@ -85,7 +110,7 @@ func (s *span) SetLog(logs ...LogField) Trace { return s } -func (s *span) setLog(logs ...LogField) Trace { +func (s *Span) setLog(logs ...LogField) Trace { protoLog := &protogen.Log{ Timestamp: time.Now().UnixNano(), Fields: make([]*protogen.Field, len(logs)), @@ -98,15 +123,15 @@ func (s *span) setLog(logs ...LogField) Trace { } // Visit visits the k-v pair in trace, calling fn for each. -func (s *span) Visit(fn func(k, v string)) { +func (s *Span) Visit(fn func(k, v string)) { fn(KratosTraceID, s.context.String()) } // SetTitle reset trace title -func (s *span) SetTitle(operationName string) { +func (s *Span) SetTitle(operationName string) { s.operationName = operationName } -func (s *span) String() string { +func (s *Span) String() string { return s.context.String() } diff --git a/pkg/net/trace/span_test.go b/pkg/net/trace/span_test.go index 17a3cef4c..8d6b92085 100644 --- a/pkg/net/trace/span_test.go +++ b/pkg/net/trace/span_test.go @@ -12,16 +12,16 @@ import ( func TestSpan(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) t.Run("test span string", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) assert.NotEmpty(t, fmt.Sprint(sp1)) }) t.Run("test fork", func(t *testing.T) { - sp1 := t1.New("testfork").(*span) - sp2 := sp1.Fork("xxx", "opt_2").(*span) - assert.Equal(t, sp1.context.traceID, sp2.context.traceID) - assert.Equal(t, sp1.context.spanID, sp2.context.parentID) + sp1 := t1.New("testfork").(*Span) + sp2 := sp1.Fork("xxx", "opt_2").(*Span) + assert.Equal(t, sp1.context.TraceID, sp2.context.TraceID) + assert.Equal(t, sp1.context.SpanID, sp2.context.ParentID) t.Run("test max fork", func(t *testing.T) { sp3 := sp2.Fork("xx", "xxx") for i := 0; i < 100; i++ { @@ -39,14 +39,14 @@ func TestSpan(t *testing.T) { }) t.Run("test finish", func(t *testing.T) { t.Run("test finish ok", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) time.Sleep(time.Millisecond) sp1.Finish(nil) assert.True(t, sp1.startTime.Unix() > 0) assert.True(t, sp1.duration > time.Microsecond) }) t.Run("test finish error", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) time.Sleep(time.Millisecond) err := fmt.Errorf("🍻") sp1.Finish(&err) @@ -71,7 +71,7 @@ func TestSpan(t *testing.T) { assert.True(t, messageLog) }) t.Run("test finish error stack", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) time.Sleep(time.Millisecond) err := fmt.Errorf("🍻") err = errors.WithStack(err) @@ -87,7 +87,7 @@ func TestSpan(t *testing.T) { assert.True(t, ok, "LogStack set") }) t.Run("test too many tags", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) for i := 0; i < 1024; i++ { sp1.SetTag(Tag{Key: strconv.Itoa(i), Value: "hello"}) } @@ -96,7 +96,7 @@ func TestSpan(t *testing.T) { assert.Equal(t, sp1.tags[_maxTags].Value, "too many tags") }) t.Run("test too many logs", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) for i := 0; i < 1024; i++ { sp1.SetLog(LogField{Key: strconv.Itoa(i), Value: "hello"}) } diff --git a/pkg/net/trace/util_test.go b/pkg/net/trace/util_test.go index c6223bd84..6d98908a1 100644 --- a/pkg/net/trace/util_test.go +++ b/pkg/net/trace/util_test.go @@ -9,7 +9,7 @@ import ( func TestFromContext(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) sp1 := t1.New("test123") ctx := context.Background() ctx = NewContext(ctx, sp1) diff --git a/pkg/net/trace/zipkin/config.go b/pkg/net/trace/zipkin/config.go new file mode 100644 index 000000000..37e3037e8 --- /dev/null +++ b/pkg/net/trace/zipkin/config.go @@ -0,0 +1,30 @@ +package zipkin + +import ( + "time" + + "github.com/bilibili/kratos/pkg/conf/env" + "github.com/bilibili/kratos/pkg/net/trace" + xtime "github.com/bilibili/kratos/pkg/time" +) + +// Config config. +// url should be the endpoint to send the spans to, e.g. +// http://localhost:9411/api/v2/spans +type Config struct { + Endpoint string `dsn:"endpoint"` + BatchSize int `dsn:"query.batch_size,100"` + Timeout xtime.Duration `dsn:"query.timeout,200ms"` + DisableSample bool `dsn:"query.disable_sample"` +} + +// Init init trace report. +func Init(c *Config) { + if c.BatchSize == 0 { + c.BatchSize = 100 + } + if c.Timeout == 0 { + c.Timeout = xtime.Duration(200 * time.Millisecond) + } + trace.SetGlobalTracer(trace.NewTracer(env.AppID, newReport(c), c.DisableSample)) +} diff --git a/pkg/net/trace/zipkin/zipkin.go b/pkg/net/trace/zipkin/zipkin.go new file mode 100644 index 000000000..db6e1737f --- /dev/null +++ b/pkg/net/trace/zipkin/zipkin.go @@ -0,0 +1,79 @@ +package zipkin + +import ( + "fmt" + "time" + + "github.com/bilibili/kratos/pkg/net/trace" + "github.com/openzipkin/zipkin-go/model" + "github.com/openzipkin/zipkin-go/reporter" + "github.com/openzipkin/zipkin-go/reporter/http" +) + +type report struct { + rpt reporter.Reporter +} + +func newReport(c *Config) *report { + return &report{ + rpt: http.NewReporter(c.Endpoint, + http.Timeout(time.Duration(c.Timeout)), + http.BatchSize(c.BatchSize), + ), + } +} + +// WriteSpan write a trace span to queue. +func (r *report) WriteSpan(raw *trace.Span) (err error) { + ctx := raw.Context() + traceID := model.TraceID{Low: ctx.TraceID} + spanID := model.ID(ctx.SpanID) + parentID := model.ID(ctx.ParentID) + tags := raw.Tags() + logs := raw.Logs() + span := model.SpanModel{ + SpanContext: model.SpanContext{ + TraceID: traceID, + ID: spanID, + ParentID: &parentID, + }, + Name: raw.Name(), + Timestamp: raw.StartTime(), + Duration: raw.Duration(), + Tags: make(map[string]string, len(tags)+len(logs)), + } + for _, tag := range tags { + switch tag.Key { + case trace.TagSpanKind: + switch tag.Value.(string) { + case "client": + span.Kind = model.Client + case "server": + span.Kind = model.Server + case "producer": + span.Kind = model.Producer + case "consumer": + span.Kind = model.Consumer + } + case trace.TagPeerService: + span.LocalEndpoint = &model.Endpoint{ServiceName: tag.Value.(string)} + default: + v, ok := tag.Value.(string) + if ok { + span.Tags[tag.Key] = v + } else { + span.Tags[tag.Key] = fmt.Sprint(v) + } + } + } + for _, lg := range logs { + span.Tags[lg.Key] = string(lg.Value) + } + r.rpt.Send(span) + return +} + +// Close close the report. +func (r *report) Close() error { + return r.rpt.Close() +} diff --git a/pkg/net/trace/zipkin/zipkin_test.go b/pkg/net/trace/zipkin/zipkin_test.go new file mode 100644 index 000000000..94d0ea566 --- /dev/null +++ b/pkg/net/trace/zipkin/zipkin_test.go @@ -0,0 +1,52 @@ +package zipkin + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/bilibili/kratos/pkg/net/trace" + xtime "github.com/bilibili/kratos/pkg/time" +) + +func TestZipkin(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("expected 'POST' request, got '%s'", r.Method) + } + + aSpanPayload, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + + t.Logf("%s\n", aSpanPayload) + })) + defer ts.Close() + + c := &Config{ + Endpoint: ts.URL, + Timeout: xtime.Duration(time.Second * 5), + BatchSize: 100, + } + //c.Endpoint = "http://127.0.0.1:9411/api/v2/spans" + report := newReport(c) + t1 := trace.NewTracer("service1", report, true) + t2 := trace.NewTracer("service2", report, true) + sp1 := t1.New("option_1") + sp2 := sp1.Fork("service3", "opt_client") + // inject + header := make(http.Header) + t1.Inject(sp2, trace.HTTPFormat, header) + t.Log(header) + sp3, err := t2.Extract(trace.HTTPFormat, header) + if err != nil { + t.Fatal(err) + } + sp3.Finish(nil) + sp2.Finish(nil) + sp1.Finish(nil) + report.Close() +}