1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-12 10:04:29 +02:00
opentelemetry-go/exporters/otlp/otlp.go
Johannes Liebermann c3c4273ecc
Add RO/RW span interfaces (#1360)
* Store span data directly in the span

- Nesting only some of a span's data in a `data` field (with the rest
  of the data living direclty in the `span` struct) is confusing.
- export.SpanData is meant to be an immutable *snapshot* of a span,
  not the "authoritative" state of the span.
- Refactor attributesMap.toSpanData into toKeyValue and make it
  return a []label.KeyValue which is clearer than modifying a struct
  passed to the function.
- Read droppedCount from the attributesMap as a separate operation
  instead of setting it from within attributesMap.toSpanData.
- Set a span's end time in the span itself rather than in the
  SpanData to allow reading the span's end time after a span has
  ended.
- Set a span's end time as soon as possible within span.End so that
  we don't influence the span's end time with operations such as
  fetching span processors and generating span data.
- Remove error handling for uninitialized spans. This check seems to
  be necessary only because we used to have an *export.SpanData field
  which could be nil. Now that we no longer have this field I think we
  can safely remove the check. The error isn't used anywhere else so
  remove it, too.

* Store parent as trace.SpanContext

The spec requires that the parent field of a Span be a Span, a
SpanContext or null.

Rather than extracting the parent's span ID from the trace.SpanContext
which we get from the tracer, store the trace.SpanContext as is and
explicitly extract the parent's span ID where necessary.

* Add ReadOnlySpan interface

Use this interface instead of export.SpanData in places where reading
information from a span is necessary. Use export.SpanData only when
exporting spans.

* Add ReadWriteSpan interface

Use this interface instead of export.SpanData in places where it is
necessary to read information from a span and write to it at the same
time.

* Rename export.SpanData to SpanSnapshot

SpanSnapshot represents the nature of this type as well as its
intended use more accurately.

Clarify the purpose of SpanSnapshot in the docs and emphasize what
should and should not be done with it.

* Rephrase attributesMap doc comment

"refreshes" is wrong for plural ("updates").

* Refactor span.End()

- Improve accuracy of span duration. Record span end time ASAP. We
  want to measure a user operation as accurately as possible, which
  means we want to mark the end time of a span as soon as possible
  after span.End() is called. Any operations we do inside span.End()
  before storing the end time affect the total duration of the span,
  and although these operations are rather fast at the moment they
  still seem to affect the duration of the span by "artificially"
  adding time between the start and end timestamps. This is relevant
  only in cases where the end time isn't explicitly specified.
- Remove redundant idempotence check. Now that IsRecording() is based
  on the value of span.endTime, IsRecording() will always return
  false after span.End() had been called because span.endTime won't
  be zero. This means we no longer need span.endOnce.
- Improve TestEndSpanTwice so that it also ensures subsequent calls
  to span.End() don't modify the span's end time.

* Update changelog

Co-authored-by: Tyler Yahn <codingalias@gmail.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
2020-12-10 21:15:44 -08:00

234 lines
6.6 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
// This code was based on
// contrib.go.opencensus.io/exporter/ocagent/connection.go
import (
"context"
"errors"
"sync"
"google.golang.org/grpc"
colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
"go.opentelemetry.io/otel/metric"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)
// Exporter is an OpenTelemetry exporter. It exports both traces and metrics
// from OpenTelemetry instrumented to code using OpenTelemetry protocol
// buffers to a configurable receiver.
type Exporter struct {
// mu protects the non-atomic and non-channel variables
mu sync.RWMutex
// senderMu protects the concurrent unsafe sends on the shared gRPC client connection.
senderMu sync.Mutex
started bool
traceExporter coltracepb.TraceServiceClient
metricExporter colmetricpb.MetricsServiceClient
cc *grpcConnection
startOnce sync.Once
stopOnce sync.Once
exportKindSelector metricsdk.ExportKindSelector
}
var _ tracesdk.SpanExporter = (*Exporter)(nil)
var _ metricsdk.Exporter = (*Exporter)(nil)
// newConfig initializes a config struct with default values and applies
// any ExporterOptions provided.
func newConfig(opts ...ExporterOption) config {
cfg := config{
grpcServiceConfig: DefaultGRPCServiceConfig,
// Note: the default ExportKindSelector is specified
// as Cumulative:
// https://github.com/open-telemetry/opentelemetry-specification/issues/731
exportKindSelector: metricsdk.CumulativeExportKindSelector(),
}
for _, opt := range opts {
opt(&cfg)
}
return cfg
}
// NewExporter constructs a new Exporter and starts it.
func NewExporter(ctx context.Context, opts ...ExporterOption) (*Exporter, error) {
exp := NewUnstartedExporter(opts...)
if err := exp.Start(ctx); err != nil {
return nil, err
}
return exp, nil
}
// NewUnstartedExporter constructs a new Exporter and does not start it.
func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
e := new(Exporter)
cfg := newConfig(opts...)
e.exportKindSelector = cfg.exportKindSelector
e.cc = newGRPCConnection(cfg, e.handleNewConnection)
return e
}
func (e *Exporter) handleNewConnection(cc *grpc.ClientConn) error {
e.mu.Lock()
defer e.mu.Unlock()
if cc != nil {
e.metricExporter = colmetricpb.NewMetricsServiceClient(cc)
e.traceExporter = coltracepb.NewTraceServiceClient(cc)
} else {
e.metricExporter = nil
e.traceExporter = nil
}
return nil
}
var (
errNoClient = errors.New("no client")
errAlreadyStarted = errors.New("already started")
errDisconnected = errors.New("exporter disconnected")
)
// Start dials to the collector, establishing a connection to it. It also
// initiates the Config and Trace services by sending over the initial
// messages that consist of the node identifier. Start invokes a background
// connector that will reattempt connections to the collector periodically
// if the connection dies.
func (e *Exporter) Start(ctx context.Context) error {
var err = errAlreadyStarted
e.startOnce.Do(func() {
e.mu.Lock()
e.started = true
e.mu.Unlock()
err = nil
e.cc.startConnection(ctx)
})
return err
}
// Shutdown closes all connections and releases resources currently being used
// by the exporter. If the exporter is not started this does nothing.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.mu.RLock()
cc := e.cc
started := e.started
e.mu.RUnlock()
if !started {
return nil
}
var err error
e.stopOnce.Do(func() {
// Clean things up before checking this error.
err = cc.shutdown(ctx)
// At this point we can change the state variable started
e.mu.Lock()
e.started = false
e.mu.Unlock()
})
return err
}
// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
// interface. It transforms and batches metric Records into OTLP Metrics and
// transmits them to the configured collector.
func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
ctx, cancel := e.cc.contextWithStop(parent)
defer cancel()
// Hardcode the number of worker goroutines to 1. We later will
// need to see if there's a way to adjust that number for longer
// running operations.
rms, err := transform.CheckpointSet(ctx, e, cps, 1)
if err != nil {
return err
}
if !e.cc.connected() {
return errDisconnected
}
err = func() error {
e.senderMu.Lock()
defer e.senderMu.Unlock()
if e.metricExporter == nil {
return errNoClient
}
_, err := e.metricExporter.Export(e.cc.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: rms,
})
return err
}()
if err != nil {
e.cc.setStateDisconnected(err)
}
return err
}
// ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter
// metric telemetry that it needs to be provided in a cumulative format.
func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) metricsdk.ExportKind {
return e.exportKindSelector.ExportKindFor(desc, kind)
}
// ExportSpans exports a batch of SpanSnapshot.
func (e *Exporter) ExportSpans(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
return e.uploadTraces(ctx, ss)
}
func (e *Exporter) uploadTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
ctx, cancel := e.cc.contextWithStop(ctx)
defer cancel()
if !e.cc.connected() {
return nil
}
protoSpans := transform.SpanData(ss)
if len(protoSpans) == 0 {
return nil
}
err := func() error {
e.senderMu.Lock()
defer e.senderMu.Unlock()
if e.traceExporter == nil {
return errNoClient
}
_, err := e.traceExporter.Export(e.cc.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
})
return err
}()
if err != nil {
e.cc.setStateDisconnected(err)
}
return err
}