1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-26 21:05:00 +02:00
opentelemetry-go/exporter/trace/jaeger/jaeger.go
Krzesimir Nowak 3362421c9b Drop the registry package (#130)
This is to shrink the PR #100.

The only place where the registry.Variable type was used was metrics,
so just inline that type into its only user. The use of the
registry.Variable type in core.Key was limited to the Name field.

The stats package also used the registry.Variable type, but seems that
also only the Name field was used and the package is going to be
dropped anyway.
2019-09-19 11:20:02 -07:00

327 lines
8.5 KiB
Go

// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jaeger
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"time"
"google.golang.org/grpc/codes"
"github.com/apache/thrift/lib/go/thrift"
"google.golang.org/api/support/bundler"
gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger"
"go.opentelemetry.io/sdk/trace"
)
const defaultServiceName = "OpenTelemetry"
// Options are the options to be used when initializing a Jaeger exporter.
type Options struct {
// CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector.
// For example, http://localhost:14268/api/traces
CollectorEndpoint string
// AgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
// For example, localhost:6831.
AgentEndpoint string
// OnError is the hook to be called when there is
// an error occurred when uploading the stats data.
// If no custom hook is set, errors are logged.
// Optional.
OnError func(err error)
// Username to be used if basic auth is required.
// Optional.
Username string
// Password to be used if basic auth is required.
// Optional.
Password string
// Process contains the information about the exporting process.
Process Process
//BufferMaxCount defines the total number of traces that can be buffered in memory
BufferMaxCount int
}
// NewExporter returns a trace.Exporter implementation that exports
// the collected spans to Jaeger.
func NewExporter(o Options) (*Exporter, error) {
if o.CollectorEndpoint == "" && o.AgentEndpoint == "" {
return nil, errors.New("missing endpoint for Jaeger exporter")
}
var endpoint string
var client *agentClientUDP
var err error
if o.CollectorEndpoint != "" {
endpoint = o.CollectorEndpoint
} else {
client, err = newAgentClientUDP(o.AgentEndpoint, udpPacketMaxLength)
if err != nil {
return nil, err
}
}
onError := func(err error) {
if o.OnError != nil {
o.OnError(err)
return
}
log.Printf("Error when uploading spans to Jaeger: %v", err)
}
service := o.Process.ServiceName
if service == "" {
service = defaultServiceName
}
tags := make([]*gen.Tag, len(o.Process.Tags))
for i, tag := range o.Process.Tags {
tags[i] = attributeToTag(tag.key, tag.value)
}
e := &Exporter{
endpoint: endpoint,
agentEndpoint: o.AgentEndpoint,
client: client,
username: o.Username,
password: o.Password,
process: &gen.Process{
ServiceName: service,
Tags: tags,
},
}
bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) {
if err := e.upload(bundle.([]*gen.Span)); err != nil {
onError(err)
}
})
// Set BufferedByteLimit with the total number of spans that are permissible to be held in memory.
// This needs to be done since the size of messages is always set to 1. Failing to set this would allow
// 1G messages to be held in memory since that is the default value of BufferedByteLimit.
if o.BufferMaxCount != 0 {
bundler.BufferedByteLimit = o.BufferMaxCount
}
e.bundler = bundler
return e, nil
}
// Process contains the information exported to jaeger about the source
// of the trace data.
type Process struct {
// ServiceName is the Jaeger service name.
ServiceName string
// Tags are added to Jaeger Process exports
Tags []Tag
}
// Tag defines a key-value pair
// It is limited to the possible conversions to *jaeger.Tag by attributeToTag
type Tag struct {
key string
value interface{}
}
// Exporter is an implementation of trace.Exporter that uploads spans to Jaeger.
type Exporter struct {
endpoint string
agentEndpoint string
process *gen.Process
bundler *bundler.Bundler
client *agentClientUDP
username, password string
}
var _ trace.Exporter = (*Exporter)(nil)
// ExportSpan exports a SpanData to Jaeger.
func (e *Exporter) ExportSpan(data *trace.SpanData) {
_ = e.bundler.Add(spanDataToThrift(data), 1)
// TODO(jbd): Handle oversized bundlers.
}
func spanDataToThrift(data *trace.SpanData) *gen.Span {
tags := make([]*gen.Tag, 0, len(data.Attributes))
for k, v := range data.Attributes {
tag := attributeToTag(k, v)
if tag != nil {
tags = append(tags, tag)
}
}
tags = append(tags,
attributeToTag("status.code", int32(data.Status)),
attributeToTag("status.message", data.Status.String()),
)
// Ensure that if Status.Code is not OK, that we set the "error" tag on the Jaeger span.
// See Issue https://github.com/census-instrumentation/opencensus-go/issues/1041
if data.Status != codes.OK {
tags = append(tags, attributeToTag("error", true))
}
var logs []*gen.Log
for _, a := range data.MessageEvents {
fields := make([]*gen.Tag, 0, len(a.Attributes))
for _, kv := range a.Attributes {
tag := attributeToTag(kv.Key.Name, kv.Value.Emit())
if tag != nil {
fields = append(fields, tag)
}
}
fields = append(fields, attributeToTag("message", a.Message))
logs = append(logs, &gen.Log{
//Timestamp: a.Time.UnixNano() / 1000,
//TODO: [rghetia] update when time is supported in the event.
Timestamp: time.Now().UnixNano() / 1000,
Fields: fields,
})
}
//TODO: [rghetia] add links.
//
//var refs []*gen.SpanRef
//for _, link := range data.Links {
// refs = append(refs, &gen.SpanRef{
// TraceIdHigh: bytesToInt64(link.TraceID[0:8]),
// TraceIdLow: bytesToInt64(link.TraceID[8:16]),
// SpanId: bytesToInt64(link.SpanID[:]),
// })
//}
return &gen.Span{
TraceIdHigh: int64(data.SpanContext.TraceID.High),
TraceIdLow: int64(data.SpanContext.TraceID.Low),
SpanId: int64(data.SpanContext.SpanID),
ParentSpanId: int64(data.ParentSpanID),
OperationName: data.Name, // TODO: if span kind is added then add prefix "Sent"/"Recv"
Flags: int32(data.SpanContext.TraceOptions),
StartTime: data.StartTime.UnixNano() / 1000,
Duration: data.EndTime.Sub(data.StartTime).Nanoseconds() / 1000,
Tags: tags,
Logs: logs,
// TODO: goes with Links.
// References: refs,
}
}
// TODO(rghetia): remove interface{}. see https://github.com/open-telemetry/opentelemetry-go/pull/112/files#r321444786
func attributeToTag(key string, a interface{}) *gen.Tag {
var tag *gen.Tag
switch value := a.(type) {
case bool:
tag = &gen.Tag{
Key: key,
VBool: &value,
VType: gen.TagType_BOOL,
}
case string:
tag = &gen.Tag{
Key: key,
VStr: &value,
VType: gen.TagType_STRING,
}
case int64:
tag = &gen.Tag{
Key: key,
VLong: &value,
VType: gen.TagType_LONG,
}
case int32:
v := int64(value)
tag = &gen.Tag{
Key: key,
VLong: &v,
VType: gen.TagType_LONG,
}
case float64:
v := float64(value)
tag = &gen.Tag{
Key: key,
VDouble: &v,
VType: gen.TagType_DOUBLE,
}
}
return tag
}
// Flush waits for exported trace spans to be uploaded.
//
// This is useful if your program is ending and you do not want to lose recent spans.
func (e *Exporter) Flush() {
e.bundler.Flush()
}
func (e *Exporter) upload(spans []*gen.Span) error {
batch := &gen.Batch{
Spans: spans,
Process: e.process,
}
if e.endpoint != "" {
return e.uploadCollector(batch)
}
return e.uploadAgent(batch)
}
func (e *Exporter) uploadAgent(batch *gen.Batch) error {
return e.client.EmitBatch(batch)
}
func (e *Exporter) uploadCollector(batch *gen.Batch) error {
body, err := serialize(batch)
if err != nil {
return err
}
req, err := http.NewRequest("POST", e.endpoint, body)
if err != nil {
return err
}
if e.username != "" && e.password != "" {
req.SetBasicAuth(e.username, e.password)
}
req.Header.Set("Content-Type", "application/x-thrift")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
_, _ = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
}
return nil
}
func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
buf := thrift.NewTMemoryBuffer()
if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil {
return nil, err
}
return buf.Buffer, nil
}