mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-24 10:07:04 +02:00
commit
de3c3b27b2
@ -10,6 +10,7 @@ import (
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/client/selector"
|
||||
"github.com/micro/go-micro/debug/trace"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/runtime"
|
||||
"github.com/micro/go-micro/server"
|
||||
@ -22,9 +23,9 @@ import (
|
||||
cmucp "github.com/micro/go-micro/client/mucp"
|
||||
|
||||
// servers
|
||||
"github.com/micro/cli/v2"
|
||||
sgrpc "github.com/micro/go-micro/server/grpc"
|
||||
smucp "github.com/micro/go-micro/server/mucp"
|
||||
"github.com/micro/cli/v2"
|
||||
|
||||
// brokers
|
||||
"github.com/micro/go-micro/broker/memory"
|
||||
@ -50,6 +51,10 @@ import (
|
||||
// stores
|
||||
memStore "github.com/micro/go-micro/store/memory"
|
||||
svcStore "github.com/micro/go-micro/store/service"
|
||||
|
||||
// tracers
|
||||
// jTracer "github.com/micro/go-micro/debug/trace/jaeger"
|
||||
memTracer "github.com/micro/go-micro/debug/trace/memory"
|
||||
)
|
||||
|
||||
type Cmd interface {
|
||||
@ -208,6 +213,16 @@ var (
|
||||
EnvVars: []string{"MICRO_TRANSPORT_ADDRESS"},
|
||||
Usage: "Comma-separated list of transport addresses",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "tracer",
|
||||
EnvVars: []string{"MICRO_TRACER"},
|
||||
Usage: "Tracer for distributed tracing, e.g. memory, jaeger",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "tracer_address",
|
||||
EnvVars: []string{"MICRO_TRACER_ADDRESS"},
|
||||
Usage: "Comma-separated list of tracer addresses",
|
||||
},
|
||||
}
|
||||
|
||||
DefaultBrokers = map[string]func(...broker.Option) broker.Broker{
|
||||
@ -254,6 +269,11 @@ var (
|
||||
"service": svcStore.NewStore,
|
||||
}
|
||||
|
||||
DefaultTracers = map[string]func(...trace.Option) trace.Tracer{
|
||||
"memory": memTracer.NewTracer,
|
||||
// "jaeger": jTracer.NewTracer,
|
||||
}
|
||||
|
||||
// used for default selection as the fall back
|
||||
defaultClient = "grpc"
|
||||
defaultServer = "grpc"
|
||||
@ -279,6 +299,7 @@ func newCmd(opts ...Option) Cmd {
|
||||
Transport: &transport.DefaultTransport,
|
||||
Runtime: &runtime.DefaultRuntime,
|
||||
Store: &store.DefaultStore,
|
||||
Tracer: &trace.DefaultTracer,
|
||||
|
||||
Brokers: DefaultBrokers,
|
||||
Clients: DefaultClients,
|
||||
@ -288,6 +309,7 @@ func newCmd(opts ...Option) Cmd {
|
||||
Transports: DefaultTransports,
|
||||
Runtimes: DefaultRuntimes,
|
||||
Stores: DefaultStores,
|
||||
Tracers: DefaultTracers,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
@ -330,7 +352,7 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
var serverOpts []server.Option
|
||||
var clientOpts []client.Option
|
||||
|
||||
// Set the runtime
|
||||
// Set the store
|
||||
if name := ctx.String("store"); len(name) > 0 {
|
||||
s, ok := c.opts.Stores[name]
|
||||
if !ok {
|
||||
@ -350,6 +372,16 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
*c.opts.Runtime = r()
|
||||
}
|
||||
|
||||
// Set the tracer
|
||||
if name := ctx.String("tracer"); len(name) > 0 {
|
||||
r, ok := c.opts.Tracers[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("Unsupported tracer: %s", name)
|
||||
}
|
||||
|
||||
*c.opts.Tracer = r()
|
||||
}
|
||||
|
||||
// Set the client
|
||||
if name := ctx.String("client"); len(name) > 0 {
|
||||
// only change if we have the client and type differs
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/client/selector"
|
||||
"github.com/micro/go-micro/debug/trace"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/runtime"
|
||||
"github.com/micro/go-micro/server"
|
||||
@ -28,6 +29,7 @@ type Options struct {
|
||||
Server *server.Server
|
||||
Runtime *runtime.Runtime
|
||||
Store *store.Store
|
||||
Tracer *trace.Tracer
|
||||
|
||||
Brokers map[string]func(...broker.Option) broker.Broker
|
||||
Clients map[string]func(...client.Option) client.Client
|
||||
@ -37,6 +39,7 @@ type Options struct {
|
||||
Transports map[string]func(...transport.Option) transport.Transport
|
||||
Runtimes map[string]func(...runtime.Option) runtime.Runtime
|
||||
Stores map[string]func(...store.Option) store.Store
|
||||
Tracers map[string]func(...trace.Option) trace.Tracer
|
||||
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
@ -100,6 +103,12 @@ func Server(s *server.Server) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func Tracer(t *trace.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
o.Tracer = t
|
||||
}
|
||||
}
|
||||
|
||||
// New broker func
|
||||
func NewBroker(name string, b func(...broker.Option) broker.Broker) Option {
|
||||
return func(o *Options) {
|
||||
@ -148,3 +157,10 @@ func NewRuntime(name string, r func(...runtime.Option) runtime.Runtime) Option {
|
||||
o.Runtimes[name] = r
|
||||
}
|
||||
}
|
||||
|
||||
// New tracer func
|
||||
func NewTracer(name string, t func(...trace.Option) trace.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
o.Tracers[name] = t
|
||||
}
|
||||
}
|
||||
|
@ -12,10 +12,14 @@ import (
|
||||
"github.com/micro/go-micro/server"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultHandler is default debug handler
|
||||
DefaultHandler = newDebug()
|
||||
)
|
||||
// NewHandler returns an instance of the Debug Handler
|
||||
func NewHandler(srv server.Server) *Debug {
|
||||
return &Debug{
|
||||
log: log.DefaultLog,
|
||||
stats: stats.DefaultStats,
|
||||
trace: srv.Options().Tracer,
|
||||
}
|
||||
}
|
||||
|
||||
type Debug struct {
|
||||
// must honour the debug handler
|
||||
@ -25,15 +29,7 @@ type Debug struct {
|
||||
// the stats collector
|
||||
stats stats.Stats
|
||||
// the tracer
|
||||
trace trace.Trace
|
||||
}
|
||||
|
||||
func newDebug() *Debug {
|
||||
return &Debug{
|
||||
log: log.DefaultLog,
|
||||
stats: stats.DefaultStats,
|
||||
trace: trace.DefaultTrace,
|
||||
}
|
||||
trace trace.Tracer
|
||||
}
|
||||
|
||||
func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error {
|
||||
|
@ -1,32 +1,33 @@
|
||||
package trace
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/debug/trace"
|
||||
"github.com/micro/go-micro/util/ring"
|
||||
)
|
||||
|
||||
type trace struct {
|
||||
opts Options
|
||||
type Tracer struct {
|
||||
opts trace.Options
|
||||
|
||||
// ring buffer of traces
|
||||
buffer *ring.Buffer
|
||||
}
|
||||
|
||||
func (t *trace) Read(opts ...ReadOption) ([]*Span, error) {
|
||||
var options ReadOptions
|
||||
func (t *Tracer) Read(opts ...trace.ReadOption) ([]*trace.Span, error) {
|
||||
var options trace.ReadOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
sp := t.buffer.Get(t.buffer.Size())
|
||||
|
||||
var spans []*Span
|
||||
var spans []*trace.Span
|
||||
|
||||
for _, span := range sp {
|
||||
val := span.Value.(*Span)
|
||||
val := span.Value.(*trace.Span)
|
||||
// skip if trace id is specified and doesn't match
|
||||
if len(options.Trace) > 0 && val.Trace != options.Trace {
|
||||
continue
|
||||
@ -37,8 +38,8 @@ func (t *trace) Read(opts ...ReadOption) ([]*Span, error) {
|
||||
return spans, nil
|
||||
}
|
||||
|
||||
func (t *trace) Start(ctx context.Context, name string) (context.Context, *Span) {
|
||||
span := &Span{
|
||||
func (t *Tracer) Start(ctx context.Context, name string) (context.Context, *trace.Span) {
|
||||
span := &trace.Span{
|
||||
Name: name,
|
||||
Trace: uuid.New().String(),
|
||||
Id: uuid.New().String(),
|
||||
@ -51,7 +52,7 @@ func (t *trace) Start(ctx context.Context, name string) (context.Context, *Span)
|
||||
return context.Background(), span
|
||||
}
|
||||
|
||||
s, ok := FromContext(ctx)
|
||||
s, ok := trace.FromContext(ctx)
|
||||
if !ok {
|
||||
return ctx, span
|
||||
}
|
||||
@ -65,7 +66,7 @@ func (t *trace) Start(ctx context.Context, name string) (context.Context, *Span)
|
||||
return ctx, span
|
||||
}
|
||||
|
||||
func (t *trace) Finish(s *Span) error {
|
||||
func (t *Tracer) Finish(s *trace.Span) error {
|
||||
// set finished time
|
||||
s.Duration = time.Since(s.Started)
|
||||
|
||||
@ -75,13 +76,13 @@ func (t *trace) Finish(s *Span) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTrace(opts ...Option) Trace {
|
||||
var options Options
|
||||
func NewTracer(opts ...trace.Option) trace.Tracer {
|
||||
var options trace.Options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return &trace{
|
||||
return &Tracer{
|
||||
opts: options,
|
||||
// the last 64 requests
|
||||
buffer: ring.New(64),
|
@ -1,6 +1,9 @@
|
||||
package trace
|
||||
|
||||
type Options struct{}
|
||||
type Options struct {
|
||||
// Size is the size of ring buffer
|
||||
Size int
|
||||
}
|
||||
|
||||
type Option func(o *Options)
|
||||
|
||||
@ -17,3 +20,15 @@ func ReadTrace(t string) ReadOption {
|
||||
o.Trace = t
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
// DefaultSize of the buffer
|
||||
DefaultSize = 64
|
||||
)
|
||||
|
||||
// DefaultOptions returns default options
|
||||
func DefaultOptions() Options {
|
||||
return Options{
|
||||
Size: DefaultSize,
|
||||
}
|
||||
}
|
||||
|
@ -6,8 +6,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Trace is an interface for distributed tracing
|
||||
type Trace interface {
|
||||
// Tracer is an interface for distributed tracing
|
||||
type Tracer interface {
|
||||
// Start a trace
|
||||
Start(ctx context.Context, name string) (context.Context, *Span)
|
||||
// Finish the trace
|
||||
@ -36,11 +36,6 @@ type Span struct {
|
||||
|
||||
type spanKey struct{}
|
||||
|
||||
var (
|
||||
// Default tracer
|
||||
DefaultTrace = NewTrace()
|
||||
)
|
||||
|
||||
// FromContext returns a span from context
|
||||
func FromContext(ctx context.Context) (*Span, bool) {
|
||||
s, ok := ctx.Value(spanKey{}).(*Span)
|
||||
@ -51,3 +46,25 @@ func FromContext(ctx context.Context) (*Span, bool) {
|
||||
func NewContext(ctx context.Context, s *Span) context.Context {
|
||||
return context.WithValue(ctx, spanKey{}, s)
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultTracer Tracer = new(noop)
|
||||
)
|
||||
|
||||
type noop struct{}
|
||||
|
||||
func (n *noop) Init(...Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noop) Start(ctx context.Context, name string) (context.Context, *Span) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (n *noop) Finish(*Span) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noop) Read(...ReadOption) ([]*Span, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
10
options.go
10
options.go
@ -4,14 +4,15 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/micro/cli/v2"
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/client/selector"
|
||||
"github.com/micro/go-micro/config/cmd"
|
||||
"github.com/micro/go-micro/debug/trace"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/server"
|
||||
"github.com/micro/go-micro/transport"
|
||||
"github.com/micro/cli/v2"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
@ -112,6 +113,13 @@ func Registry(r registry.Registry) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Tracer sets the tracer for the service
|
||||
func Tracer(t trace.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
o.Server.Init(server.Tracer(t))
|
||||
}
|
||||
}
|
||||
|
||||
// Selector sets the selector for the service client
|
||||
func Selector(s selector.Selector) Option {
|
||||
return func(o *Options) {
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/codec"
|
||||
"github.com/micro/go-micro/debug/trace"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
@ -15,6 +16,7 @@ type Options struct {
|
||||
Codecs map[string]codec.NewCodec
|
||||
Broker broker.Broker
|
||||
Registry registry.Registry
|
||||
Tracer trace.Tracer
|
||||
Transport transport.Transport
|
||||
Metadata map[string]string
|
||||
Name string
|
||||
@ -152,6 +154,13 @@ func Registry(r registry.Registry) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Tracer mechanism for distributed tracking
|
||||
func Tracer(t trace.Tracer) Option {
|
||||
return func(o *Options) {
|
||||
o.Tracer = t
|
||||
}
|
||||
}
|
||||
|
||||
// Transport mechanism for communication e.g http, rabbitmq, etc
|
||||
func Transport(t transport.Transport) Option {
|
||||
return func(o *Options) {
|
||||
|
10
service.go
10
service.go
@ -15,7 +15,6 @@ import (
|
||||
"github.com/micro/go-micro/debug/profile/pprof"
|
||||
"github.com/micro/go-micro/debug/service/handler"
|
||||
"github.com/micro/go-micro/debug/stats"
|
||||
"github.com/micro/go-micro/debug/trace"
|
||||
"github.com/micro/go-micro/plugin"
|
||||
"github.com/micro/go-micro/server"
|
||||
"github.com/micro/go-micro/util/log"
|
||||
@ -36,17 +35,14 @@ func newService(opts ...Option) Service {
|
||||
|
||||
// wrap client to inject From-Service header on any calls
|
||||
options.Client = wrapper.FromService(serviceName, options.Client)
|
||||
|
||||
// wrap client to inject From-Service header on any calls
|
||||
options.Client = wrapper.TraceCall(serviceName, trace.DefaultTrace, options.Client)
|
||||
options.Client = wrapper.TraceCall(serviceName, options.Server.Options().Tracer, options.Client)
|
||||
|
||||
// wrap the server to provide handler stats
|
||||
options.Server.Init(
|
||||
server.WrapHandler(wrapper.HandlerStats(stats.DefaultStats)),
|
||||
server.WrapHandler(wrapper.TraceHandler(trace.DefaultTrace)),
|
||||
server.WrapHandler(wrapper.TraceHandler(options.Server.Options().Tracer)),
|
||||
)
|
||||
|
||||
|
||||
return &service{
|
||||
opts: options,
|
||||
}
|
||||
@ -162,7 +158,7 @@ func (s *service) Run() error {
|
||||
// register the debug handler
|
||||
s.opts.Server.Handle(
|
||||
s.opts.Server.NewHandler(
|
||||
handler.DefaultHandler,
|
||||
handler.NewHandler(s.Options().Server),
|
||||
server.InternalHandler(true),
|
||||
),
|
||||
)
|
||||
|
@ -19,7 +19,7 @@ type traceWrapper struct {
|
||||
client.Client
|
||||
|
||||
name string
|
||||
trace trace.Trace
|
||||
trace trace.Tracer
|
||||
}
|
||||
|
||||
var (
|
||||
@ -97,7 +97,7 @@ func HandlerStats(stats stats.Stats) server.HandlerWrapper {
|
||||
}
|
||||
|
||||
// TraceCall is a call tracing wrapper
|
||||
func TraceCall(name string, t trace.Trace, c client.Client) client.Client {
|
||||
func TraceCall(name string, t trace.Tracer, c client.Client) client.Client {
|
||||
return &traceWrapper{
|
||||
name: name,
|
||||
trace: t,
|
||||
@ -106,7 +106,7 @@ func TraceCall(name string, t trace.Trace, c client.Client) client.Client {
|
||||
}
|
||||
|
||||
// TraceHandler wraps a server handler to perform tracing
|
||||
func TraceHandler(t trace.Trace) server.HandlerWrapper {
|
||||
func TraceHandler(t trace.Tracer) server.HandlerWrapper {
|
||||
// return a handler wrapper
|
||||
return func(h server.HandlerFunc) server.HandlerFunc {
|
||||
// return a function that returns a function
|
||||
|
Loading…
Reference in New Issue
Block a user