2020-03-23 22:41:10 -07:00
|
|
|
// Copyright The OpenTelemetry Authors
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2020-11-04 18:10:58 +01:00
|
|
|
package jaeger // import "go.opentelemetry.io/otel/exporters/trace/jaeger"
|
2019-10-04 16:07:42 -03:00
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
2021-03-22 14:12:56 -05:00
|
|
|
"context"
|
2019-10-04 16:07:42 -03:00
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
2020-09-01 12:08:11 -04:00
|
|
|
"log"
|
2019-10-04 16:07:42 -03:00
|
|
|
"net/http"
|
2020-09-01 12:08:11 -04:00
|
|
|
"time"
|
2019-10-04 16:07:42 -03:00
|
|
|
|
2021-02-19 13:12:35 -05:00
|
|
|
"go.opentelemetry.io/otel/exporters/trace/jaeger/internal/third_party/thrift/lib/go/thrift"
|
2019-10-04 16:07:42 -03:00
|
|
|
|
2020-03-02 13:54:57 -08:00
|
|
|
gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger"
|
2019-10-04 16:07:42 -03:00
|
|
|
)
|
|
|
|
|
|
|
|
// batchUploader send a batch of spans to Jaeger
|
|
|
|
type batchUploader interface {
|
2021-04-22 14:47:27 +00:00
|
|
|
upload(context.Context, *gen.Batch) error
|
|
|
|
shutdown(context.Context) error
|
2019-10-04 16:07:42 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
type EndpointOption func() (batchUploader, error)
|
|
|
|
|
2021-04-09 09:56:26 -07:00
|
|
|
// WithAgentEndpoint configures the Jaeger exporter to send spans to a jaeger-agent. This will
|
|
|
|
// use the following environment variables for configuration if no explicit option is provided:
|
|
|
|
//
|
|
|
|
// - OTEL_EXPORTER_JAEGER_AGENT_HOST is used for the agent address host
|
|
|
|
// - OTEL_EXPORTER_JAEGER_AGENT_PORT is used for the agent address port
|
|
|
|
//
|
|
|
|
// The passed options will take precedence over any environment variables and default values
|
|
|
|
// will be used if neither are provided.
|
|
|
|
func WithAgentEndpoint(options ...AgentEndpointOption) EndpointOption {
|
2019-10-04 16:07:42 -03:00
|
|
|
return func() (batchUploader, error) {
|
2020-09-01 12:08:11 -04:00
|
|
|
o := &AgentEndpointOptions{
|
|
|
|
agentClientUDPParams{
|
|
|
|
AttemptReconnecting: true,
|
2021-04-09 09:56:26 -07:00
|
|
|
Host: envOr(envAgentHost, "localhost"),
|
|
|
|
Port: envOr(envAgentPort, "6832"),
|
2020-09-01 12:08:11 -04:00
|
|
|
},
|
|
|
|
}
|
|
|
|
for _, opt := range options {
|
|
|
|
opt(o)
|
|
|
|
}
|
|
|
|
|
|
|
|
client, err := newAgentClientUDP(o.agentClientUDPParams)
|
2019-10-04 16:07:42 -03:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &agentUploader{client: client}, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-01 12:08:11 -04:00
|
|
|
type AgentEndpointOption func(o *AgentEndpointOptions)
|
|
|
|
|
|
|
|
type AgentEndpointOptions struct {
|
|
|
|
agentClientUDPParams
|
|
|
|
}
|
|
|
|
|
2021-04-09 09:56:26 -07:00
|
|
|
// WithAgentHost sets a host to be used in the agent client endpoint.
|
|
|
|
// This option overrides any value set for the
|
|
|
|
// OTEL_EXPORTER_JAEGER_AGENT_HOST environment variable.
|
|
|
|
// If this option is not passed and the env var is not set, "localhost" will be used by default.
|
|
|
|
func WithAgentHost(host string) AgentEndpointOption {
|
|
|
|
return func(o *AgentEndpointOptions) {
|
|
|
|
o.Host = host
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithAgentPort sets a port to be used in the agent client endpoint.
|
|
|
|
// This option overrides any value set for the
|
|
|
|
// OTEL_EXPORTER_JAEGER_AGENT_PORT environment variable.
|
|
|
|
// If this option is not passed and the env var is not set, "6832" will be used by default.
|
|
|
|
func WithAgentPort(port string) AgentEndpointOption {
|
|
|
|
return func(o *AgentEndpointOptions) {
|
|
|
|
o.Port = port
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-01 12:08:11 -04:00
|
|
|
// WithLogger sets a logger to be used by agent client.
|
|
|
|
func WithLogger(logger *log.Logger) AgentEndpointOption {
|
|
|
|
return func(o *AgentEndpointOptions) {
|
|
|
|
o.Logger = logger
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithDisableAttemptReconnecting sets option to disable reconnecting udp client.
|
|
|
|
func WithDisableAttemptReconnecting() AgentEndpointOption {
|
|
|
|
return func(o *AgentEndpointOptions) {
|
|
|
|
o.AttemptReconnecting = false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithAttemptReconnectingInterval sets the interval between attempts to re resolve agent endpoint.
|
|
|
|
func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption {
|
|
|
|
return func(o *AgentEndpointOptions) {
|
|
|
|
o.AttemptReconnectInterval = interval
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-21 12:02:06 -07:00
|
|
|
// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector. This will
|
|
|
|
// use the following environment variables for configuration if no explicit option is provided:
|
|
|
|
//
|
|
|
|
// - OTEL_EXPORTER_JAEGER_ENDPOINT is the HTTP endpoint for sending spans directly to a collector.
|
|
|
|
// - OTEL_EXPORTER_JAEGER_USER is the username to be sent as authentication to the collector endpoint.
|
|
|
|
// - OTEL_EXPORTER_JAEGER_PASSWORD is the password to be sent as authentication to the collector endpoint.
|
|
|
|
//
|
|
|
|
// The passed options will take precedence over any environment variables.
|
|
|
|
// If neither values are provided for the endpoint, the default value of "http://localhost:14250" will be used.
|
|
|
|
// If neither values are provided for the username or the password, they will not be set since there is no default.
|
|
|
|
func WithCollectorEndpoint(options ...CollectorEndpointOption) EndpointOption {
|
2019-10-04 16:07:42 -03:00
|
|
|
return func() (batchUploader, error) {
|
2020-04-28 21:35:34 +03:00
|
|
|
o := &CollectorEndpointOptions{
|
2021-04-21 12:02:06 -07:00
|
|
|
endpoint: envOr(envEndpoint, "http://localhost:14250"),
|
|
|
|
username: envOr(envUser, ""),
|
|
|
|
password: envOr(envPassword, ""),
|
2020-04-28 21:35:34 +03:00
|
|
|
httpClient: http.DefaultClient,
|
|
|
|
}
|
2020-06-06 16:42:24 +08:00
|
|
|
|
2019-10-04 16:07:42 -03:00
|
|
|
for _, opt := range options {
|
|
|
|
opt(o)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &collectorUploader{
|
2021-04-21 12:02:06 -07:00
|
|
|
endpoint: o.endpoint,
|
2020-04-28 21:35:34 +03:00
|
|
|
username: o.username,
|
|
|
|
password: o.password,
|
|
|
|
httpClient: o.httpClient,
|
2019-10-04 16:07:42 -03:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type CollectorEndpointOption func(o *CollectorEndpointOptions)
|
|
|
|
|
|
|
|
type CollectorEndpointOptions struct {
|
2021-04-21 12:02:06 -07:00
|
|
|
// endpoint for sending spans directly to a collector.
|
|
|
|
endpoint string
|
|
|
|
|
|
|
|
// username to be used for authentication with the collector endpoint.
|
2019-10-04 16:07:42 -03:00
|
|
|
username string
|
|
|
|
|
2021-04-21 12:02:06 -07:00
|
|
|
// password to be used for authentication with the collector endpoint.
|
2019-10-04 16:07:42 -03:00
|
|
|
password string
|
2020-04-28 21:35:34 +03:00
|
|
|
|
|
|
|
// httpClient to be used to make requests to the collector endpoint.
|
|
|
|
httpClient *http.Client
|
2019-10-04 16:07:42 -03:00
|
|
|
}
|
|
|
|
|
2021-04-21 12:02:06 -07:00
|
|
|
// WithEndpoint is the URL for the Jaeger collector that spans are sent to.
|
|
|
|
// This option overrides any value set for the
|
|
|
|
// OTEL_EXPORTER_JAEGER_ENDPOINT environment variable.
|
|
|
|
// If this option is not passed and the environment variable is not set,
|
|
|
|
// "http://localhost:14250" will be used by default.
|
|
|
|
func WithEndpoint(endpoint string) CollectorEndpointOption {
|
|
|
|
return func(o *CollectorEndpointOptions) {
|
|
|
|
o.endpoint = endpoint
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithUsername sets the username to be used in the authorization header sent for all requests to the collector.
|
|
|
|
// This option overrides any value set for the
|
|
|
|
// OTEL_EXPORTER_JAEGER_USER environment variable.
|
|
|
|
// If this option is not passed and the environment variable is not set, no username will be set.
|
2020-04-21 23:30:57 -04:00
|
|
|
func WithUsername(username string) CollectorEndpointOption {
|
2019-10-04 16:07:42 -03:00
|
|
|
return func(o *CollectorEndpointOptions) {
|
|
|
|
o.username = username
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-21 12:02:06 -07:00
|
|
|
// WithPassword sets the password to be used in the authorization header sent for all requests to the collector.
|
|
|
|
// This option overrides any value set for the
|
|
|
|
// OTEL_EXPORTER_JAEGER_PASSWORD environment variable.
|
|
|
|
// If this option is not passed and the environment variable is not set, no password will be set.
|
2020-04-21 23:30:57 -04:00
|
|
|
func WithPassword(password string) CollectorEndpointOption {
|
2019-10-04 16:07:42 -03:00
|
|
|
return func(o *CollectorEndpointOptions) {
|
|
|
|
o.password = password
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-28 21:35:34 +03:00
|
|
|
// WithHTTPClient sets the http client to be used to make request to the collector endpoint.
|
|
|
|
func WithHTTPClient(client *http.Client) CollectorEndpointOption {
|
|
|
|
return func(o *CollectorEndpointOptions) {
|
|
|
|
o.httpClient = client
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-04 16:07:42 -03:00
|
|
|
// agentUploader implements batchUploader interface sending batches to
|
|
|
|
// Jaeger through the UDP agent.
|
|
|
|
type agentUploader struct {
|
|
|
|
client *agentClientUDP
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ batchUploader = (*agentUploader)(nil)
|
|
|
|
|
2021-04-22 14:47:27 +00:00
|
|
|
func (a *agentUploader) shutdown(ctx context.Context) error {
|
|
|
|
done := make(chan error, 1)
|
|
|
|
go func() {
|
|
|
|
done <- a.client.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
// Prioritize not blocking the calling thread and just leak the
|
|
|
|
// spawned goroutine to close the client.
|
|
|
|
return ctx.Err()
|
|
|
|
case err := <-done:
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-19 14:27:22 +00:00
|
|
|
func (a *agentUploader) upload(ctx context.Context, batch *gen.Batch) error {
|
|
|
|
return a.client.EmitBatch(ctx, batch)
|
2019-10-04 16:07:42 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
// collectorUploader implements batchUploader interface sending batches to
|
|
|
|
// Jaeger through the collector http endpoint.
|
|
|
|
type collectorUploader struct {
|
2020-04-28 21:35:34 +03:00
|
|
|
endpoint string
|
|
|
|
username string
|
|
|
|
password string
|
|
|
|
httpClient *http.Client
|
2019-10-04 16:07:42 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
var _ batchUploader = (*collectorUploader)(nil)
|
|
|
|
|
2021-04-22 14:47:27 +00:00
|
|
|
func (c *collectorUploader) shutdown(ctx context.Context) error {
|
|
|
|
// The Exporter will cancel any active exports and will prevent all
|
|
|
|
// subsequent exports, so nothing to do here.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-04-19 14:27:22 +00:00
|
|
|
func (c *collectorUploader) upload(ctx context.Context, batch *gen.Batch) error {
|
2019-10-04 16:07:42 -03:00
|
|
|
body, err := serialize(batch)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-04-19 14:27:22 +00:00
|
|
|
req, err := http.NewRequestWithContext(ctx, "POST", c.endpoint, body)
|
2019-10-04 16:07:42 -03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if c.username != "" && c.password != "" {
|
|
|
|
req.SetBasicAuth(c.username, c.password)
|
|
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/x-thrift")
|
|
|
|
|
2020-04-28 21:35:34 +03:00
|
|
|
resp, err := c.httpClient.Do(req)
|
2019-10-04 16:07:42 -03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
_, _ = io.Copy(ioutil.Discard, resp.Body)
|
|
|
|
resp.Body.Close()
|
|
|
|
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
|
|
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
|
|
|
|
buf := thrift.NewTMemoryBuffer()
|
2021-03-22 14:12:56 -05:00
|
|
|
if err := obj.Write(context.Background(), thrift.NewTBinaryProtocolConf(buf, &thrift.TConfiguration{})); err != nil {
|
2019-10-04 16:07:42 -03:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return buf.Buffer, nil
|
|
|
|
}
|