mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-18 03:22:12 +02:00
295 lines
9.6 KiB
Go
295 lines
9.6 KiB
Go
|
// Copyright 2019, OpenTelemetry Authors
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package stackdriver
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
traceapi "cloud.google.com/go/trace/apiv2"
|
||
|
"golang.org/x/oauth2/google"
|
||
|
"google.golang.org/api/option"
|
||
|
|
||
|
"go.opentelemetry.io/api/key"
|
||
|
"go.opentelemetry.io/sdk/export"
|
||
|
"go.opentelemetry.io/sdk/trace"
|
||
|
)
|
||
|
|
||
|
// Option is function type that is passed to the exporter initialization function.
|
||
|
type Option func(*options)
|
||
|
|
||
|
// options contains options for configuring the exporter.
|
||
|
type options struct {
|
||
|
// ProjectID is the identifier of the Stackdriver
|
||
|
// project the user is uploading the stats data to.
|
||
|
// If not set, this will default to your "Application Default Credentials".
|
||
|
// For details see: https://developers.google.com/accounts/docs/application-default-credentials.
|
||
|
//
|
||
|
// It will be used in the project_id label of a Stackdriver monitored
|
||
|
// resource if the resource does not inherently belong to a specific
|
||
|
// project, e.g. on-premise resource like k8s_container or generic_task.
|
||
|
ProjectID string
|
||
|
|
||
|
// Location is the identifier of the GCP or AWS cloud region/zone in which
|
||
|
// the data for a resource is stored.
|
||
|
// If not set, it will default to the location provided by the metadata server.
|
||
|
//
|
||
|
// It will be used in the location label of a Stackdriver monitored resource
|
||
|
// if the resource does not inherently belong to a specific project, e.g.
|
||
|
// on-premise resource like k8s_container or generic_task.
|
||
|
Location string
|
||
|
|
||
|
// OnError is the hook to be called when there is
|
||
|
// an error uploading the stats or tracing data.
|
||
|
// If no custom hook is set, errors are logged.
|
||
|
// Optional.
|
||
|
OnError func(err error)
|
||
|
|
||
|
// MonitoringClientOptions are additional options to be passed
|
||
|
// to the underlying Stackdriver Monitoring API client.
|
||
|
// Optional.
|
||
|
MonitoringClientOptions []option.ClientOption
|
||
|
|
||
|
// TraceClientOptions are additional options to be passed
|
||
|
// to the underlying Stackdriver Trace API client.
|
||
|
// Optional.
|
||
|
TraceClientOptions []option.ClientOption
|
||
|
|
||
|
// BatchDelayThreshold determines the max amount of time
|
||
|
// the exporter can wait before uploading view data or trace spans to
|
||
|
// the backend.
|
||
|
// Optional.
|
||
|
BatchDelayThreshold time.Duration
|
||
|
|
||
|
// MaxQueueSize is the maximum queue size to buffer spans for delayed processing.
|
||
|
// See the detailed definition in https://godoc.org/go.opentelemetry.io/sdk/trace#BatchSpanProcessorOptions.
|
||
|
// Default is 2048. Optional.
|
||
|
MaxQueueSize int
|
||
|
|
||
|
// MaxExportBatchSize is the maximum number of spans to process in a single batch.
|
||
|
// See the detailed definition in https://godoc.org/go.opentelemetry.io/sdk/trace#BatchSpanProcessorOptions.
|
||
|
// Default is 512. Optional.
|
||
|
MaxExportBatchSize int
|
||
|
|
||
|
// TraceSpansBufferMaxBytes is the maximum size (in bytes) of spans that
|
||
|
// will be buffered in memory before being dropped.
|
||
|
//
|
||
|
// If unset, a default of 8MB will be used.
|
||
|
// TraceSpansBufferMaxBytes int
|
||
|
|
||
|
// DefaultTraceAttributes will be appended to every span that is exported to
|
||
|
// Stackdriver Trace.
|
||
|
DefaultTraceAttributes map[string]interface{}
|
||
|
|
||
|
// Context allows you to provide a custom context for API calls.
|
||
|
//
|
||
|
// This context will be used several times: first, to create Stackdriver
|
||
|
// trace and metric clients, and then every time a new batch of traces or
|
||
|
// stats needs to be uploaded.
|
||
|
//
|
||
|
// Do not set a timeout on this context. Instead, set the Timeout option.
|
||
|
//
|
||
|
// If unset, context.Background() will be used.
|
||
|
Context context.Context
|
||
|
|
||
|
// SkipCMD enforces to skip all the CreateMetricDescriptor calls.
|
||
|
// These calls are important in order to configure the unit of the metrics,
|
||
|
// but in some cases all the exported metrics are builtin (unit is configured)
|
||
|
// or the unit is not important.
|
||
|
SkipCMD bool
|
||
|
|
||
|
// Timeout for all API calls. If not set, defaults to 5 seconds.
|
||
|
Timeout time.Duration
|
||
|
|
||
|
// ReportingInterval sets the interval between reporting metrics.
|
||
|
// If it is set to zero then default value is used.
|
||
|
ReportingInterval time.Duration
|
||
|
|
||
|
// NumberOfWorkers sets the number of go rountines that send requests
|
||
|
// to Stackdriver Monitoring. This is only used for Proto metrics export
|
||
|
// for now. The minimum number of workers is 1.
|
||
|
NumberOfWorkers int
|
||
|
}
|
||
|
|
||
|
// WithProjectID sets Google Cloud Platform project as projectID.
|
||
|
// Without using this option, it automatically detects the project ID
|
||
|
// from the default credential detection process.
|
||
|
// Please find the detailed order of the default credentail detection proecess on the doc:
|
||
|
// https://godoc.org/golang.org/x/oauth2/google#FindDefaultCredentials
|
||
|
func WithProjectID(projectID string) func(o *options) {
|
||
|
return func(o *options) {
|
||
|
o.ProjectID = projectID
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithOnError sets the hook to be called when there is an error
|
||
|
// occurred on uploading the span data to Stackdriver.
|
||
|
// If no custom hook is set, errors are logged.
|
||
|
func WithOnError(onError func(err error)) func(o *options) {
|
||
|
return func(o *options) {
|
||
|
o.OnError = onError
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithTraceClientOptions sets additionial client options for tracing.
|
||
|
func WithTraceClientOptions(opts []option.ClientOption) func(o *options) {
|
||
|
return func(o *options) {
|
||
|
o.TraceClientOptions = opts
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithContext sets the context that trace exporter and metric exporter
|
||
|
// relies on.
|
||
|
func WithContext(ctx context.Context) func(o *options) {
|
||
|
return func(o *options) {
|
||
|
o.Context = ctx
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *options) handleError(err error) {
|
||
|
if o.OnError != nil {
|
||
|
o.OnError(err)
|
||
|
return
|
||
|
}
|
||
|
log.Printf("Failed to export to Stackdriver: %v", err)
|
||
|
}
|
||
|
|
||
|
// defaultTimeout is used as default when timeout is not set in newContextWithTimout.
|
||
|
const defaultTimeout = 5 * time.Second
|
||
|
|
||
|
// Exporter is a trace exporter that uploads data to Stackdriver.
|
||
|
//
|
||
|
// TODO(yoshifumi): add a metrics exporter once the spec definition
|
||
|
// process and the sampler implementation are done.
|
||
|
type Exporter struct {
|
||
|
once sync.Once
|
||
|
traceExporter *traceExporter
|
||
|
spanProcessor trace.SpanProcessor
|
||
|
}
|
||
|
|
||
|
// NewExporter creates a new Exporter thats implements trace.Exporter.
|
||
|
//
|
||
|
// TODO(yoshifumi): add a metrics exporter one the spec definition
|
||
|
// process and the sampler implementation are done.
|
||
|
func NewExporter(opts ...Option) (*Exporter, error) {
|
||
|
o := options{}
|
||
|
for _, opt := range opts {
|
||
|
opt(&o)
|
||
|
}
|
||
|
if o.ProjectID == "" {
|
||
|
ctx := o.Context
|
||
|
if ctx == nil {
|
||
|
ctx = context.Background()
|
||
|
}
|
||
|
creds, err := google.FindDefaultCredentials(ctx, traceapi.DefaultAuthScopes()...)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("Stackdriver: %v", err)
|
||
|
}
|
||
|
if creds.ProjectID == "" {
|
||
|
return nil, errors.New("Stackdriver: no project found with application default credentials")
|
||
|
}
|
||
|
o.ProjectID = creds.ProjectID
|
||
|
}
|
||
|
te, err := newTraceExporter(&o)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return &Exporter{
|
||
|
traceExporter: te,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func newContextWithTimeout(ctx context.Context, timeout time.Duration) (context.Context, func()) {
|
||
|
if ctx == nil {
|
||
|
ctx = context.Background()
|
||
|
}
|
||
|
if timeout <= 0 {
|
||
|
timeout = defaultTimeout
|
||
|
}
|
||
|
return context.WithTimeout(ctx, timeout)
|
||
|
}
|
||
|
|
||
|
// RegisterBatchSpanProcessor registers e as BatchSpanProcessor.
|
||
|
func (e *Exporter) RegisterBatchSpanProcessor() error {
|
||
|
opts := []trace.BatchSpanProcessorOption{}
|
||
|
|
||
|
if e.traceExporter.o.BatchDelayThreshold > 0 {
|
||
|
opts = append(opts, trace.WithScheduleDelayMillis(e.traceExporter.o.BatchDelayThreshold))
|
||
|
}
|
||
|
if e.traceExporter.o.MaxQueueSize > 0 {
|
||
|
opts = append(opts, trace.WithMaxQueueSize(e.traceExporter.o.MaxQueueSize))
|
||
|
}
|
||
|
if e.traceExporter.o.MaxExportBatchSize > 0 {
|
||
|
opts = append(opts, trace.WithMaxExportBatchSize(e.traceExporter.o.MaxExportBatchSize))
|
||
|
}
|
||
|
|
||
|
var err error
|
||
|
e.once.Do(func() {
|
||
|
e.spanProcessor, err = trace.NewBatchSpanProcessor(e, opts...)
|
||
|
trace.RegisterSpanProcessor(e.spanProcessor)
|
||
|
})
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// RegisterSimpleSpanProcessor registers e as SimpleSpanProcessor.
|
||
|
func (e *Exporter) RegisterSimpleSpanProcessor() {
|
||
|
e.once.Do(func() {
|
||
|
e.spanProcessor = trace.NewSimpleSpanProcessor(e)
|
||
|
trace.RegisterSpanProcessor(e.spanProcessor)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// ExportSpan exports a SpanData to Stackdriver Trace.
|
||
|
func (e *Exporter) ExportSpan(ctx context.Context, sd *export.SpanData) {
|
||
|
if len(e.traceExporter.o.DefaultTraceAttributes) > 0 {
|
||
|
sd = e.sdWithDefaultTraceAttributes(sd)
|
||
|
}
|
||
|
e.traceExporter.ExportSpan(ctx, sd)
|
||
|
}
|
||
|
|
||
|
// ExportSpans exports a slice of SpanData to Stackdriver Trace in batch
|
||
|
func (e *Exporter) ExportSpans(ctx context.Context, sds []*export.SpanData) {
|
||
|
e.traceExporter.ExportSpans(ctx, sds)
|
||
|
}
|
||
|
|
||
|
func (e *Exporter) sdWithDefaultTraceAttributes(sd *export.SpanData) *export.SpanData {
|
||
|
newSD := *sd
|
||
|
for k, v := range e.traceExporter.o.DefaultTraceAttributes {
|
||
|
switch val := v.(type) {
|
||
|
case bool:
|
||
|
newSD.Attributes = append(newSD.Attributes, key.New(k).Bool(val))
|
||
|
case int64:
|
||
|
newSD.Attributes = append(newSD.Attributes, key.New(k).Int64(val))
|
||
|
case float64:
|
||
|
newSD.Attributes = append(newSD.Attributes, key.New(k).Float64(val))
|
||
|
case string:
|
||
|
newSD.Attributes = append(newSD.Attributes, key.New(k).String(val))
|
||
|
}
|
||
|
}
|
||
|
newSD.Attributes = append(newSD.Attributes, sd.Attributes...)
|
||
|
return &newSD
|
||
|
}
|
||
|
|
||
|
// Shutdown unregisters spanProcessor.
|
||
|
func (e *Exporter) Shutdown() {
|
||
|
trace.UnregisterSpanProcessor(e.spanProcessor)
|
||
|
e.spanProcessor = nil
|
||
|
}
|