mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-01 13:07:51 +02:00
change jaeger options to functional style (#161)
* change jaeger options to functional style * fix lints * add interface validaiton
This commit is contained in:
parent
bf2e5e9f09
commit
b1bb19aa80
@ -30,12 +30,12 @@ func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create Jaeger Exporter
|
||||
exporter, err := jaeger.NewExporter(jaeger.Options{
|
||||
CollectorEndpoint: "http://localhost:14268/api/traces",
|
||||
Process: jaeger.Process{
|
||||
exporter, err := jaeger.NewExporter(
|
||||
jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"),
|
||||
jaeger.WithProcess(jaeger.Process{
|
||||
ServiceName: "trace-demo",
|
||||
},
|
||||
})
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -15,15 +15,8 @@
|
||||
package jaeger
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
"google.golang.org/api/support/bundler"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
@ -34,30 +27,15 @@ import (
|
||||
|
||||
const defaultServiceName = "OpenTelemetry"
|
||||
|
||||
// Options are the options to be used when initializing a Jaeger exporter.
|
||||
type Options struct {
|
||||
// CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector.
|
||||
// For example, http://localhost:14268/api/traces
|
||||
CollectorEndpoint string
|
||||
|
||||
// AgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
|
||||
// For example, localhost:6831.
|
||||
AgentEndpoint string
|
||||
type Option func(*options)
|
||||
|
||||
// options are the options to be used when initializing a Jaeger exporter.
|
||||
type options struct {
|
||||
// OnError is the hook to be called when there is
|
||||
// an error occurred when uploading the stats data.
|
||||
// an error occurred when uploading the span data.
|
||||
// If no custom hook is set, errors are logged.
|
||||
// Optional.
|
||||
OnError func(err error)
|
||||
|
||||
// Username to be used if basic auth is required.
|
||||
// Optional.
|
||||
Username string
|
||||
|
||||
// Password to be used if basic auth is required.
|
||||
// Optional.
|
||||
Password string
|
||||
|
||||
// Process contains the information about the exporting process.
|
||||
Process Process
|
||||
|
||||
@ -65,24 +43,42 @@ type Options struct {
|
||||
BufferMaxCount int
|
||||
}
|
||||
|
||||
// WithOnError sets the hook to be called when there is
|
||||
// an error occurred when uploading the span data.
|
||||
// If no custom hook is set, errors are logged.
|
||||
func WithOnError(onError func(err error)) func(o *options) {
|
||||
return func(o *options) {
|
||||
o.OnError = onError
|
||||
}
|
||||
}
|
||||
|
||||
// WithProcess sets the process with the information about the exporting process.
|
||||
func WithProcess(process Process) func(o *options) {
|
||||
return func(o *options) {
|
||||
o.Process = process
|
||||
}
|
||||
}
|
||||
|
||||
//WithBufferMaxCount defines the total number of traces that can be buffered in memory
|
||||
func WithBufferMaxCount(bufferMaxCount int) func(o *options) {
|
||||
return func(o *options) {
|
||||
o.BufferMaxCount = bufferMaxCount
|
||||
}
|
||||
}
|
||||
|
||||
// NewExporter returns a trace.Exporter implementation that exports
|
||||
// the collected spans to Jaeger.
|
||||
func NewExporter(o Options) (*Exporter, error) {
|
||||
if o.CollectorEndpoint == "" && o.AgentEndpoint == "" {
|
||||
return nil, errors.New("missing endpoint for Jaeger exporter")
|
||||
func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) {
|
||||
uploader, err := endpointOption()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var endpoint string
|
||||
var client *agentClientUDP
|
||||
var err error
|
||||
if o.CollectorEndpoint != "" {
|
||||
endpoint = o.CollectorEndpoint
|
||||
} else {
|
||||
client, err = newAgentClientUDP(o.AgentEndpoint, udpPacketMaxLength)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o := options{}
|
||||
for _, opt := range opts {
|
||||
opt(&o)
|
||||
}
|
||||
|
||||
onError := func(err error) {
|
||||
if o.OnError != nil {
|
||||
o.OnError(err)
|
||||
@ -99,11 +95,7 @@ func NewExporter(o Options) (*Exporter, error) {
|
||||
tags[i] = attributeToTag(tag.key, tag.value)
|
||||
}
|
||||
e := &Exporter{
|
||||
endpoint: endpoint,
|
||||
agentEndpoint: o.AgentEndpoint,
|
||||
client: client,
|
||||
username: o.Username,
|
||||
password: o.Password,
|
||||
uploader: uploader,
|
||||
process: &gen.Process{
|
||||
ServiceName: service,
|
||||
Tags: tags,
|
||||
@ -145,13 +137,9 @@ type Tag struct {
|
||||
|
||||
// Exporter is an implementation of trace.Exporter that uploads spans to Jaeger.
|
||||
type Exporter struct {
|
||||
endpoint string
|
||||
agentEndpoint string
|
||||
process *gen.Process
|
||||
bundler *bundler.Bundler
|
||||
client *agentClientUDP
|
||||
|
||||
username, password string
|
||||
process *gen.Process
|
||||
bundler *bundler.Bundler
|
||||
uploader batchUploader
|
||||
}
|
||||
|
||||
var _ trace.Exporter = (*Exporter)(nil)
|
||||
@ -328,48 +316,6 @@ func (e *Exporter) upload(spans []*gen.Span) error {
|
||||
Spans: spans,
|
||||
Process: e.process,
|
||||
}
|
||||
if e.endpoint != "" {
|
||||
return e.uploadCollector(batch)
|
||||
}
|
||||
return e.uploadAgent(batch)
|
||||
}
|
||||
|
||||
func (e *Exporter) uploadAgent(batch *gen.Batch) error {
|
||||
return e.client.EmitBatch(batch)
|
||||
}
|
||||
|
||||
func (e *Exporter) uploadCollector(batch *gen.Batch) error {
|
||||
body, err := serialize(batch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("POST", e.endpoint, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if e.username != "" && e.password != "" {
|
||||
req.SetBasicAuth(e.username, e.password)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/x-thrift")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, _ = io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
|
||||
buf := thrift.NewTMemoryBuffer()
|
||||
if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Buffer, nil
|
||||
|
||||
return e.uploader.upload(batch)
|
||||
}
|
||||
|
141
exporter/trace/jaeger/uploader.go
Normal file
141
exporter/trace/jaeger/uploader.go
Normal file
@ -0,0 +1,141 @@
|
||||
package jaeger
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
|
||||
gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger"
|
||||
)
|
||||
|
||||
// batchUploader send a batch of spans to Jaeger
|
||||
type batchUploader interface {
|
||||
upload(batch *gen.Batch) error
|
||||
}
|
||||
|
||||
type EndpointOption func() (batchUploader, error)
|
||||
|
||||
// WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
|
||||
// For example, localhost:6831.
|
||||
func WithAgentEndpoint(agentEndpoint string) func() (batchUploader, error) {
|
||||
return func() (batchUploader, error) {
|
||||
if agentEndpoint == "" {
|
||||
return nil, errors.New("agentEndpoint must not be empty.")
|
||||
}
|
||||
|
||||
client, err := newAgentClientUDP(agentEndpoint, udpPacketMaxLength)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &agentUploader{client: client}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector.
|
||||
// For example, http://localhost:14268/api/traces
|
||||
func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) func() (batchUploader, error) {
|
||||
return func() (batchUploader, error) {
|
||||
if collectorEndpoint == "" {
|
||||
return nil, errors.New("collectorEndpoint must not be empty.")
|
||||
}
|
||||
|
||||
o := &CollectorEndpointOptions{}
|
||||
for _, opt := range options {
|
||||
opt(o)
|
||||
}
|
||||
|
||||
return &collectorUploader{
|
||||
endpoint: collectorEndpoint,
|
||||
username: o.username,
|
||||
password: o.password,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
type CollectorEndpointOption func(o *CollectorEndpointOptions)
|
||||
|
||||
type CollectorEndpointOptions struct {
|
||||
// username to be used if basic auth is required.
|
||||
username string
|
||||
|
||||
// password to be used if basic auth is required.
|
||||
password string
|
||||
}
|
||||
|
||||
// WithUsername sets the username to be used if basic auth is required.
|
||||
func WithUsername(username string) func(o *CollectorEndpointOptions) {
|
||||
return func(o *CollectorEndpointOptions) {
|
||||
o.username = username
|
||||
}
|
||||
}
|
||||
|
||||
// WithPassword sets the password to be used if basic auth is required.
|
||||
func WithPassword(password string) func(o *CollectorEndpointOptions) {
|
||||
return func(o *CollectorEndpointOptions) {
|
||||
o.password = password
|
||||
}
|
||||
}
|
||||
|
||||
// agentUploader implements batchUploader interface sending batches to
|
||||
// Jaeger through the UDP agent.
|
||||
type agentUploader struct {
|
||||
client *agentClientUDP
|
||||
}
|
||||
|
||||
var _ batchUploader = (*agentUploader)(nil)
|
||||
|
||||
func (a *agentUploader) upload(batch *gen.Batch) error {
|
||||
return a.client.EmitBatch(batch)
|
||||
}
|
||||
|
||||
// collectorUploader implements batchUploader interface sending batches to
|
||||
// Jaeger through the collector http endpoint.
|
||||
type collectorUploader struct {
|
||||
endpoint string
|
||||
username string
|
||||
password string
|
||||
}
|
||||
|
||||
var _ batchUploader = (*collectorUploader)(nil)
|
||||
|
||||
func (c *collectorUploader) upload(batch *gen.Batch) error {
|
||||
body, err := serialize(batch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("POST", c.endpoint, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if c.username != "" && c.password != "" {
|
||||
req.SetBasicAuth(c.username, c.password)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/x-thrift")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, _ = io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
|
||||
buf := thrift.NewTMemoryBuffer()
|
||||
if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Buffer, nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user