1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-12 10:04:29 +02:00
opentelemetry-go/exporters/otlp/otlp.go
Joshua MacDonald 0e2fdfc682
Support cumulative, delta, and pass-through exporters (#840)
* Update Process()

* Checkpoint

* Add subtractor; fix test

* Fix all simple integrator tests

* Build the rest (checkpoint)

* Pass all but Prometheus tests

* Precommit pass

* Add aggregation.Kind argument to ExportKindFor

* Remove Subtractor support

* Remove dead test code

* Restore the Subtractor code

* Fix the tests

* Comments

* Add tests for MetricKind

* Add ChangeSign test

* Test ExportKind

* New file

* Rename ChangeSign

* Remove a TODO, add a TODO

* Remove Stateful remnants

* Typo

* Typo

* Test an invalid export kind

* Comments

* Lint

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
2020-06-22 22:59:51 -07:00

307 lines
8.0 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// code in this package is mostly copied from contrib.go.opencensus.io/exporter/ocagent/connection.go
package otlp
import (
"context"
"errors"
"fmt"
"sync"
"unsafe"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)
type Exporter struct {
// mu protects the non-atomic and non-channel variables
mu sync.RWMutex
// senderMu protects the concurrent unsafe sends on the shared gRPC client connection.
senderMu sync.Mutex
started bool
traceExporter coltracepb.TraceServiceClient
metricExporter colmetricpb.MetricsServiceClient
grpcClientConn *grpc.ClientConn
lastConnectErrPtr unsafe.Pointer
startOnce sync.Once
stopCh chan bool
disconnectedCh chan bool
backgroundConnectionDoneCh chan bool
c Config
metadata metadata.MD
}
var _ tracesdk.SpanBatcher = (*Exporter)(nil)
var _ metricsdk.Exporter = (*Exporter)(nil)
func configureOptions(cfg *Config, opts ...ExporterOption) {
for _, opt := range opts {
opt(cfg)
}
}
func NewExporter(opts ...ExporterOption) (*Exporter, error) {
exp := NewUnstartedExporter(opts...)
if err := exp.Start(); err != nil {
return nil, err
}
return exp, nil
}
func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
e := new(Exporter)
e.c = Config{
numWorkers: DefaultNumWorkers,
grpcServiceConfig: DefaultGRPCServiceConfig,
}
configureOptions(&e.c, opts...)
if len(e.c.headers) > 0 {
e.metadata = metadata.New(e.c.headers)
}
// TODO (rghetia): add resources
return e
}
var (
errAlreadyStarted = errors.New("already started")
errNotStarted = errors.New("not started")
errDisconnected = errors.New("exporter disconnected")
errStopped = errors.New("exporter stopped")
errContextCanceled = errors.New("context canceled")
)
// Start dials to the collector, establishing a connection to it. It also
// initiates the Config and Trace services by sending over the initial
// messages that consist of the node identifier. Start invokes a background
// connector that will reattempt connections to the collector periodically
// if the connection dies.
func (e *Exporter) Start() error {
var err = errAlreadyStarted
e.startOnce.Do(func() {
e.mu.Lock()
e.started = true
e.disconnectedCh = make(chan bool, 1)
e.stopCh = make(chan bool)
e.backgroundConnectionDoneCh = make(chan bool)
e.mu.Unlock()
// An optimistic first connection attempt to ensure that
// applications under heavy load can immediately process
// data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63
if err := e.connect(); err == nil {
e.setStateConnected()
} else {
e.setStateDisconnected(err)
}
go e.indefiniteBackgroundConnection()
err = nil
})
return err
}
func (e *Exporter) prepareCollectorAddress() string {
if e.c.collectorAddr != "" {
return e.c.collectorAddr
}
return fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort)
}
func (e *Exporter) enableConnections(cc *grpc.ClientConn) error {
e.mu.RLock()
started := e.started
e.mu.RUnlock()
if !started {
return errNotStarted
}
e.mu.Lock()
// If previous clientConn is same as the current then just return.
// This doesn't happen right now as this func is only called with new ClientConn.
// It is more about future-proofing.
if e.grpcClientConn == cc {
e.mu.Unlock()
return nil
}
// If the previous clientConn was non-nil, close it
if e.grpcClientConn != nil {
_ = e.grpcClientConn.Close()
}
e.grpcClientConn = cc
e.traceExporter = coltracepb.NewTraceServiceClient(cc)
e.metricExporter = colmetricpb.NewMetricsServiceClient(cc)
e.mu.Unlock()
return nil
}
func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context {
if e.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, e.metadata)
}
return ctx
}
func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
addr := e.prepareCollectorAddress()
dialOpts := []grpc.DialOption{}
if e.c.grpcServiceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(e.c.grpcServiceConfig))
}
if e.c.clientCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(e.c.clientCredentials))
} else if e.c.canDialInsecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if e.c.compressor != "" {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(e.c.compressor)))
}
if len(e.c.grpcDialOptions) != 0 {
dialOpts = append(dialOpts, e.c.grpcDialOptions...)
}
ctx := e.contextWithMetadata(context.Background())
return grpc.DialContext(ctx, addr, dialOpts...)
}
// Stop shuts down all the connections and resources
// related to the exporter.
// If the exporter is not started then this func does nothing.
func (e *Exporter) Stop() error {
e.mu.RLock()
cc := e.grpcClientConn
started := e.started
e.mu.RUnlock()
if !started {
return nil
}
// Now close the underlying gRPC connection.
var err error
if cc != nil {
err = cc.Close()
}
// At this point we can change the state variable started
e.mu.Lock()
e.started = false
e.mu.Unlock()
close(e.stopCh)
// Ensure that the backgroundConnector returns
<-e.backgroundConnectionDoneCh
return err
}
// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
// interface. It transforms and batches metric Records into OTLP Metrics and
// transmits them to the configured collector.
func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
// Unify the parent context Done signal with the exporter stopCh.
ctx, cancel := context.WithCancel(parent)
defer cancel()
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
case <-e.stopCh:
cancel()
}
}(ctx, cancel)
rms, err := transform.CheckpointSet(ctx, e, cps, e.c.numWorkers)
if err != nil {
return err
}
if !e.connected() {
return errDisconnected
}
select {
case <-e.stopCh:
return errStopped
case <-ctx.Done():
return errContextCanceled
default:
e.senderMu.Lock()
_, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: rms,
})
e.senderMu.Unlock()
if err != nil {
return err
}
}
return nil
}
func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) metricsdk.ExportKind {
return metricsdk.PassThroughExporter
}
func (e *Exporter) ExportSpan(ctx context.Context, sd *tracesdk.SpanData) {
e.uploadTraces(ctx, []*tracesdk.SpanData{sd})
}
func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) {
e.uploadTraces(ctx, sds)
}
func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) {
select {
case <-e.stopCh:
return
default:
if !e.connected() {
return
}
protoSpans := transform.SpanData(sdl)
if len(protoSpans) == 0 {
return
}
e.senderMu.Lock()
_, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
})
e.senderMu.Unlock()
if err != nil {
e.setStateDisconnected(err)
}
}
}