From b1bb19aa8024864c38dc7e6bd5cb7a2e34887b12 Mon Sep 17 00:00:00 2001 From: Gustavo Silva Paiva Date: Fri, 4 Oct 2019 16:07:42 -0300 Subject: [PATCH] change jaeger options to functional style (#161) * change jaeger options to functional style * fix lints * add interface validaiton --- exporter/trace/jaeger/example/main.go | 10 +- exporter/trace/jaeger/jaeger.go | 136 ++++++++----------------- exporter/trace/jaeger/uploader.go | 141 ++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 100 deletions(-) create mode 100644 exporter/trace/jaeger/uploader.go diff --git a/exporter/trace/jaeger/example/main.go b/exporter/trace/jaeger/example/main.go index bea839b66..43d5acd8e 100644 --- a/exporter/trace/jaeger/example/main.go +++ b/exporter/trace/jaeger/example/main.go @@ -30,12 +30,12 @@ func main() { ctx := context.Background() // Create Jaeger Exporter - exporter, err := jaeger.NewExporter(jaeger.Options{ - CollectorEndpoint: "http://localhost:14268/api/traces", - Process: jaeger.Process{ + exporter, err := jaeger.NewExporter( + jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"), + jaeger.WithProcess(jaeger.Process{ ServiceName: "trace-demo", - }, - }) + }), + ) if err != nil { log.Fatal(err) } diff --git a/exporter/trace/jaeger/jaeger.go b/exporter/trace/jaeger/jaeger.go index b7621f68d..14a1f882c 100644 --- a/exporter/trace/jaeger/jaeger.go +++ b/exporter/trace/jaeger/jaeger.go @@ -15,15 +15,8 @@ package jaeger import ( - "bytes" - "errors" - "fmt" - "io" - "io/ioutil" "log" - "net/http" - "github.com/apache/thrift/lib/go/thrift" "google.golang.org/api/support/bundler" "google.golang.org/grpc/codes" @@ -34,30 +27,15 @@ import ( const defaultServiceName = "OpenTelemetry" -// Options are the options to be used when initializing a Jaeger exporter. -type Options struct { - // CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector. - // For example, http://localhost:14268/api/traces - CollectorEndpoint string - - // AgentEndpoint instructs exporter to send spans to jaeger-agent at this address. - // For example, localhost:6831. - AgentEndpoint string +type Option func(*options) +// options are the options to be used when initializing a Jaeger exporter. +type options struct { // OnError is the hook to be called when there is - // an error occurred when uploading the stats data. + // an error occurred when uploading the span data. // If no custom hook is set, errors are logged. - // Optional. OnError func(err error) - // Username to be used if basic auth is required. - // Optional. - Username string - - // Password to be used if basic auth is required. - // Optional. - Password string - // Process contains the information about the exporting process. Process Process @@ -65,24 +43,42 @@ type Options struct { BufferMaxCount int } +// WithOnError sets the hook to be called when there is +// an error occurred when uploading the span data. +// If no custom hook is set, errors are logged. +func WithOnError(onError func(err error)) func(o *options) { + return func(o *options) { + o.OnError = onError + } +} + +// WithProcess sets the process with the information about the exporting process. +func WithProcess(process Process) func(o *options) { + return func(o *options) { + o.Process = process + } +} + +//WithBufferMaxCount defines the total number of traces that can be buffered in memory +func WithBufferMaxCount(bufferMaxCount int) func(o *options) { + return func(o *options) { + o.BufferMaxCount = bufferMaxCount + } +} + // NewExporter returns a trace.Exporter implementation that exports // the collected spans to Jaeger. -func NewExporter(o Options) (*Exporter, error) { - if o.CollectorEndpoint == "" && o.AgentEndpoint == "" { - return nil, errors.New("missing endpoint for Jaeger exporter") +func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) { + uploader, err := endpointOption() + if err != nil { + return nil, err } - var endpoint string - var client *agentClientUDP - var err error - if o.CollectorEndpoint != "" { - endpoint = o.CollectorEndpoint - } else { - client, err = newAgentClientUDP(o.AgentEndpoint, udpPacketMaxLength) - if err != nil { - return nil, err - } + o := options{} + for _, opt := range opts { + opt(&o) } + onError := func(err error) { if o.OnError != nil { o.OnError(err) @@ -99,11 +95,7 @@ func NewExporter(o Options) (*Exporter, error) { tags[i] = attributeToTag(tag.key, tag.value) } e := &Exporter{ - endpoint: endpoint, - agentEndpoint: o.AgentEndpoint, - client: client, - username: o.Username, - password: o.Password, + uploader: uploader, process: &gen.Process{ ServiceName: service, Tags: tags, @@ -145,13 +137,9 @@ type Tag struct { // Exporter is an implementation of trace.Exporter that uploads spans to Jaeger. type Exporter struct { - endpoint string - agentEndpoint string - process *gen.Process - bundler *bundler.Bundler - client *agentClientUDP - - username, password string + process *gen.Process + bundler *bundler.Bundler + uploader batchUploader } var _ trace.Exporter = (*Exporter)(nil) @@ -328,48 +316,6 @@ func (e *Exporter) upload(spans []*gen.Span) error { Spans: spans, Process: e.process, } - if e.endpoint != "" { - return e.uploadCollector(batch) - } - return e.uploadAgent(batch) -} - -func (e *Exporter) uploadAgent(batch *gen.Batch) error { - return e.client.EmitBatch(batch) -} - -func (e *Exporter) uploadCollector(batch *gen.Batch) error { - body, err := serialize(batch) - if err != nil { - return err - } - req, err := http.NewRequest("POST", e.endpoint, body) - if err != nil { - return err - } - if e.username != "" && e.password != "" { - req.SetBasicAuth(e.username, e.password) - } - req.Header.Set("Content-Type", "application/x-thrift") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - - _, _ = io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode) - } - return nil -} - -func serialize(obj thrift.TStruct) (*bytes.Buffer, error) { - buf := thrift.NewTMemoryBuffer() - if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil { - return nil, err - } - return buf.Buffer, nil + + return e.uploader.upload(batch) } diff --git a/exporter/trace/jaeger/uploader.go b/exporter/trace/jaeger/uploader.go new file mode 100644 index 000000000..0f6f26d2a --- /dev/null +++ b/exporter/trace/jaeger/uploader.go @@ -0,0 +1,141 @@ +package jaeger + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + + "github.com/apache/thrift/lib/go/thrift" + + gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger" +) + +// batchUploader send a batch of spans to Jaeger +type batchUploader interface { + upload(batch *gen.Batch) error +} + +type EndpointOption func() (batchUploader, error) + +// WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address. +// For example, localhost:6831. +func WithAgentEndpoint(agentEndpoint string) func() (batchUploader, error) { + return func() (batchUploader, error) { + if agentEndpoint == "" { + return nil, errors.New("agentEndpoint must not be empty.") + } + + client, err := newAgentClientUDP(agentEndpoint, udpPacketMaxLength) + if err != nil { + return nil, err + } + + return &agentUploader{client: client}, nil + } +} + +// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector. +// For example, http://localhost:14268/api/traces +func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) func() (batchUploader, error) { + return func() (batchUploader, error) { + if collectorEndpoint == "" { + return nil, errors.New("collectorEndpoint must not be empty.") + } + + o := &CollectorEndpointOptions{} + for _, opt := range options { + opt(o) + } + + return &collectorUploader{ + endpoint: collectorEndpoint, + username: o.username, + password: o.password, + }, nil + } +} + +type CollectorEndpointOption func(o *CollectorEndpointOptions) + +type CollectorEndpointOptions struct { + // username to be used if basic auth is required. + username string + + // password to be used if basic auth is required. + password string +} + +// WithUsername sets the username to be used if basic auth is required. +func WithUsername(username string) func(o *CollectorEndpointOptions) { + return func(o *CollectorEndpointOptions) { + o.username = username + } +} + +// WithPassword sets the password to be used if basic auth is required. +func WithPassword(password string) func(o *CollectorEndpointOptions) { + return func(o *CollectorEndpointOptions) { + o.password = password + } +} + +// agentUploader implements batchUploader interface sending batches to +// Jaeger through the UDP agent. +type agentUploader struct { + client *agentClientUDP +} + +var _ batchUploader = (*agentUploader)(nil) + +func (a *agentUploader) upload(batch *gen.Batch) error { + return a.client.EmitBatch(batch) +} + +// collectorUploader implements batchUploader interface sending batches to +// Jaeger through the collector http endpoint. +type collectorUploader struct { + endpoint string + username string + password string +} + +var _ batchUploader = (*collectorUploader)(nil) + +func (c *collectorUploader) upload(batch *gen.Batch) error { + body, err := serialize(batch) + if err != nil { + return err + } + req, err := http.NewRequest("POST", c.endpoint, body) + if err != nil { + return err + } + if c.username != "" && c.password != "" { + req.SetBasicAuth(c.username, c.password) + } + req.Header.Set("Content-Type", "application/x-thrift") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + _, _ = io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode) + } + return nil +} + +func serialize(obj thrift.TStruct) (*bytes.Buffer, error) { + buf := thrift.NewTMemoryBuffer() + if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil { + return nil, err + } + return buf.Buffer, nil +}