diff --git a/CHANGELOG.md b/CHANGELOG.md index 05853e937..ba4d40dc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Add the `ReadOnlySpan` and `ReadWriteSpan` interfaces to provide better control for accessing span data. (#1360) +- `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369) ### Changed @@ -19,6 +20,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Improve span duration accuracy. (#1360) - Migrated CI/CD from CircleCI to GitHub Actions (#1382) - Remove duplicate checkout from GitHub Actions workflow (#1407) +- `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369) +- Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369) + ### Removed - Remove `errUninitializedSpan` as its only usage is now obsolete. (#1360) diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index a2452f874..3ba6d029e 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -49,11 +49,12 @@ func initProvider() func() { // `localhost:30080` address. Otherwise, replace `localhost` with the // address of your cluster. If you run the app inside k8s, then you can // probably connect directly to the service through dns - exp, err := otlp.NewExporter(ctx, + driver := otlp.NewGRPCDriver( otlp.WithInsecure(), otlp.WithAddress("localhost:30080"), otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing ) + exp, err := otlp.NewExporter(ctx, driver) handleErr(err, "failed to create exporter") res, err := resource.New(ctx, diff --git a/exporters/otlp/example_test.go b/exporters/otlp/example_test.go index a34811e25..35d0727d6 100644 --- a/exporters/otlp/example_test.go +++ b/exporters/otlp/example_test.go @@ -29,7 +29,8 @@ import ( func Example_insecure() { ctx := context.Background() - exp, err := otlp.NewExporter(ctx, otlp.WithInsecure()) + driver := otlp.NewGRPCDriver(otlp.WithInsecure()) + exp, err := otlp.NewExporter(ctx, driver) if err != nil { log.Fatalf("Failed to create the collector exporter: %v", err) } @@ -74,7 +75,8 @@ func Example_withTLS() { } ctx := context.Background() - exp, err := otlp.NewExporter(ctx, otlp.WithTLSCredentials(creds)) + driver := otlp.NewGRPCDriver(otlp.WithTLSCredentials(creds)) + exp, err := otlp.NewExporter(ctx, driver) if err != nil { log.Fatalf("failed to create the collector exporter: %v", err) } diff --git a/exporters/otlp/connection.go b/exporters/otlp/grpcconnection.go similarity index 94% rename from exporters/otlp/connection.go rename to exporters/otlp/grpcconnection.go index 283d6d42d..c3c3af4dd 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/grpcconnection.go @@ -16,7 +16,6 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" import ( "context" - "fmt" "math/rand" "sync" "sync/atomic" @@ -37,9 +36,9 @@ type grpcConnection struct { cc *grpc.ClientConn // these fields are read-only after constructor is finished - c config + c grpcConnectionConfig metadata metadata.MD - newConnectionHandler func(cc *grpc.ClientConn) error + newConnectionHandler func(cc *grpc.ClientConn) // these channels are created once disconnectedCh chan bool @@ -52,12 +51,9 @@ type grpcConnection struct { closeBackgroundConnectionDoneCh func(ch chan struct{}) } -func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcConnection { +func newGRPCConnection(c grpcConnectionConfig, handler func(cc *grpc.ClientConn)) *grpcConnection { conn := new(grpcConnection) conn.newConnectionHandler = handler - if c.collectorAddr == "" { - c.collectorAddr = fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort) - } conn.c = c if len(conn.c.headers) > 0 { conn.metadata = metadata.New(conn.c.headers) @@ -103,7 +99,7 @@ func (oc *grpcConnection) setStateDisconnected(err error) { case oc.disconnectedCh <- true: default: } - _ = oc.newConnectionHandler(nil) + oc.newConnectionHandler(nil) } func (oc *grpcConnection) setStateConnected() { @@ -180,7 +176,8 @@ func (oc *grpcConnection) connect(ctx context.Context) error { return err } oc.setConnection(cc) - return oc.newConnectionHandler(cc) + oc.newConnectionHandler(cc) + return nil } // setConnection sets cc as the client connection and returns true if @@ -245,8 +242,6 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error { return ctx.Err() } - close(oc.disconnectedCh) - oc.mu.Lock() cc := oc.cc oc.cc = nil diff --git a/exporters/otlp/grpcdriver.go b/exporters/otlp/grpcdriver.go new file mode 100644 index 000000000..885cb1a15 --- /dev/null +++ b/exporters/otlp/grpcdriver.go @@ -0,0 +1,144 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp // import "go.opentelemetry.io/otel/exporters/otlp" + +import ( + "context" + "fmt" + "sync" + + "google.golang.org/grpc" + + colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" + coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" + metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" + tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/otel/exporters/otlp/internal/transform" + metricsdk "go.opentelemetry.io/otel/sdk/export/metric" + tracesdk "go.opentelemetry.io/otel/sdk/export/trace" +) + +type grpcDriver struct { + connection *grpcConnection + + lock sync.Mutex + metricsClient colmetricpb.MetricsServiceClient + tracesClient coltracepb.TraceServiceClient +} + +func NewGRPCDriver(opts ...GRPCConnectionOption) ProtocolDriver { + cfg := grpcConnectionConfig{ + collectorAddr: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort), + grpcServiceConfig: DefaultGRPCServiceConfig, + } + for _, opt := range opts { + opt(&cfg) + } + d := &grpcDriver{} + d.connection = newGRPCConnection(cfg, d.handleNewConnection) + return d +} + +func (d *grpcDriver) handleNewConnection(cc *grpc.ClientConn) { + d.lock.Lock() + defer d.lock.Unlock() + if cc != nil { + d.metricsClient = colmetricpb.NewMetricsServiceClient(cc) + d.tracesClient = coltracepb.NewTraceServiceClient(cc) + } else { + d.metricsClient = nil + d.tracesClient = nil + } +} + +func (d *grpcDriver) Start(ctx context.Context) error { + d.connection.startConnection(ctx) + return nil +} + +func (d *grpcDriver) Stop(ctx context.Context) error { + return d.connection.shutdown(ctx) +} + +func (d *grpcDriver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { + if !d.connection.connected() { + return errDisconnected + } + ctx, cancel := d.connection.contextWithStop(ctx) + defer cancel() + + rms, err := transform.CheckpointSet(ctx, selector, cps, 1) + if err != nil { + return err + } + if len(rms) == 0 { + return nil + } + + return d.uploadMetrics(ctx, rms) +} + +func (d *grpcDriver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error { + ctx = d.connection.contextWithMetadata(ctx) + err := func() error { + d.lock.Lock() + defer d.lock.Unlock() + if d.metricsClient == nil { + return errNoClient + } + _, err := d.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ + ResourceMetrics: protoMetrics, + }) + return err + }() + if err != nil { + d.connection.setStateDisconnected(err) + } + return err +} + +func (d *grpcDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { + if !d.connection.connected() { + return errDisconnected + } + ctx, cancel := d.connection.contextWithStop(ctx) + defer cancel() + + protoSpans := transform.SpanData(ss) + if len(protoSpans) == 0 { + return nil + } + + return d.uploadTraces(ctx, protoSpans) +} + +func (d *grpcDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { + ctx = d.connection.contextWithMetadata(ctx) + err := func() error { + d.lock.Lock() + defer d.lock.Unlock() + if d.tracesClient == nil { + return errNoClient + } + _, err := d.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: protoSpans, + }) + return err + }() + if err != nil { + d.connection.setStateDisconnected(err) + } + return err +} diff --git a/exporters/otlp/grpcoptions.go b/exporters/otlp/grpcoptions.go new file mode 100644 index 000000000..6d4e56097 --- /dev/null +++ b/exporters/otlp/grpcoptions.go @@ -0,0 +1,145 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp // import "go.opentelemetry.io/otel/exporters/otlp" + +import ( + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +const ( + // DefaultGRPCServiceConfig is the gRPC service config used if none is + // provided by the user. + // + // For more info on gRPC service configs: + // https://github.com/grpc/proposal/blob/master/A6-client-retries.md + // + // For more info on the RetryableStatusCodes we allow here: + // https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response + // + // Note: MaxAttempts > 5 are treated as 5. See + // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy + // for more details. + DefaultGRPCServiceConfig = `{ + "methodConfig":[{ + "name":[ + { "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" }, + { "service":"opentelemetry.proto.collector.trace.v1.TraceService" } + ], + "retryPolicy":{ + "MaxAttempts":5, + "InitialBackoff":"0.3s", + "MaxBackoff":"5s", + "BackoffMultiplier":2, + "RetryableStatusCodes":[ + "UNAVAILABLE", + "CANCELLED", + "DEADLINE_EXCEEDED", + "RESOURCE_EXHAUSTED", + "ABORTED", + "OUT_OF_RANGE", + "UNAVAILABLE", + "DATA_LOSS" + ] + } + }] +}` +) + +type grpcConnectionConfig struct { + canDialInsecure bool + collectorAddr string + compressor string + reconnectionPeriod time.Duration + grpcServiceConfig string + grpcDialOptions []grpc.DialOption + headers map[string]string + clientCredentials credentials.TransportCredentials +} + +type GRPCConnectionOption func(cfg *grpcConnectionConfig) + +// WithInsecure disables client transport security for the exporter's gRPC connection +// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure +// does. Note, by default, client security is required unless WithInsecure is used. +func WithInsecure() GRPCConnectionOption { + return func(cfg *grpcConnectionConfig) { + cfg.canDialInsecure = true + } +} + +// WithAddress allows one to set the address that the exporter will +// connect to the collector on. If unset, it will instead try to use +// connect to DefaultCollectorHost:DefaultCollectorPort. +func WithAddress(addr string) GRPCConnectionOption { + return func(cfg *grpcConnectionConfig) { + cfg.collectorAddr = addr + } +} + +// WithReconnectionPeriod allows one to set the delay between next connection attempt +// after failing to connect with the collector. +func WithReconnectionPeriod(rp time.Duration) GRPCConnectionOption { + return func(cfg *grpcConnectionConfig) { + cfg.reconnectionPeriod = rp + } +} + +// WithCompressor will set the compressor for the gRPC client to use when sending requests. +// It is the responsibility of the caller to ensure that the compressor set has been registered +// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some +// compressors auto-register on import, such as gzip, which can be registered by calling +// `import _ "google.golang.org/grpc/encoding/gzip"` +func WithCompressor(compressor string) GRPCConnectionOption { + return func(cfg *grpcConnectionConfig) { + cfg.compressor = compressor + } +} + +// WithHeaders will send the provided headers with gRPC requests +func WithHeaders(headers map[string]string) GRPCConnectionOption { + return func(cfg *grpcConnectionConfig) { + cfg.headers = headers + } +} + +// WithTLSCredentials allows the connection to use TLS credentials +// when talking to the server. It takes in grpc.TransportCredentials instead +// of say a Certificate file or a tls.Certificate, because the retrieving +// these credentials can be done in many ways e.g. plain file, in code tls.Config +// or by certificate rotation, so it is up to the caller to decide what to use. +func WithTLSCredentials(creds credentials.TransportCredentials) GRPCConnectionOption { + return func(cfg *grpcConnectionConfig) { + cfg.clientCredentials = creds + } +} + +// WithGRPCServiceConfig defines the default gRPC service config used. +func WithGRPCServiceConfig(serviceConfig string) GRPCConnectionOption { + return func(cfg *grpcConnectionConfig) { + cfg.grpcServiceConfig = serviceConfig + } +} + +// WithGRPCDialOption opens support to any grpc.DialOption to be used. If it conflicts +// with some other configuration the GRPC specified via the collector the ones here will +// take preference since they are set last. +func WithGRPCDialOption(opts ...grpc.DialOption) GRPCConnectionOption { + return func(cfg *grpcConnectionConfig) { + cfg.grpcDialOptions = opts + } +} diff --git a/exporters/otlp/options.go b/exporters/otlp/options.go index 3fdb9b64d..8a8432e4e 100644 --- a/exporters/otlp/options.go +++ b/exporters/otlp/options.go @@ -15,11 +15,6 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" import ( - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - metricsdk "go.opentelemetry.io/otel/sdk/export/metric" ) @@ -30,132 +25,19 @@ const ( // DefaultCollectorHost is the host address the Exporter will attempt // connect to if no collector address is provided. DefaultCollectorHost string = "localhost" - // DefaultGRPCServiceConfig is the gRPC service config used if none is - // provided by the user. - // - // For more info on gRPC service configs: - // https://github.com/grpc/proposal/blob/master/A6-client-retries.md - // - // For more info on the RetryableStatusCodes we allow here: - // https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response - // - // Note: MaxAttempts > 5 are treated as 5. See - // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy - // for more details. - DefaultGRPCServiceConfig = `{ - "methodConfig":[{ - "name":[ - { "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" }, - { "service":"opentelemetry.proto.collector.trace.v1.TraceService" } - ], - "retryPolicy":{ - "MaxAttempts":5, - "InitialBackoff":"0.3s", - "MaxBackoff":"5s", - "BackoffMultiplier":2, - "RetryableStatusCodes":[ - "UNAVAILABLE", - "CANCELLED", - "DEADLINE_EXCEEDED", - "RESOURCE_EXHAUSTED", - "ABORTED", - "OUT_OF_RANGE", - "UNAVAILABLE", - "DATA_LOSS" - ] - } - }] -}` ) // ExporterOption are setting options passed to an Exporter on creation. type ExporterOption func(*config) type config struct { - canDialInsecure bool - collectorAddr string - compressor string - reconnectionPeriod time.Duration - grpcServiceConfig string - grpcDialOptions []grpc.DialOption - headers map[string]string - clientCredentials credentials.TransportCredentials exportKindSelector metricsdk.ExportKindSelector } -// WithInsecure disables client transport security for the exporter's gRPC connection -// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure -// does. Note, by default, client security is required unless WithInsecure is used. -func WithInsecure() ExporterOption { - return func(cfg *config) { - cfg.canDialInsecure = true - } -} - -// WithAddress allows one to set the address that the exporter will -// connect to the collector on. If unset, it will instead try to use -// connect to DefaultCollectorHost:DefaultCollectorPort. -func WithAddress(addr string) ExporterOption { - return func(cfg *config) { - cfg.collectorAddr = addr - } -} - -// WithReconnectionPeriod allows one to set the delay between next connection attempt -// after failing to connect with the collector. -func WithReconnectionPeriod(rp time.Duration) ExporterOption { - return func(cfg *config) { - cfg.reconnectionPeriod = rp - } -} - -// WithCompressor will set the compressor for the gRPC client to use when sending requests. -// It is the responsibility of the caller to ensure that the compressor set has been registered -// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some -// compressors auto-register on import, such as gzip, which can be registered by calling -// `import _ "google.golang.org/grpc/encoding/gzip"` -func WithCompressor(compressor string) ExporterOption { - return func(cfg *config) { - cfg.compressor = compressor - } -} - -// WithHeaders will send the provided headers with gRPC requests -func WithHeaders(headers map[string]string) ExporterOption { - return func(cfg *config) { - cfg.headers = headers - } -} - -// WithTLSCredentials allows the connection to use TLS credentials -// when talking to the server. It takes in grpc.TransportCredentials instead -// of say a Certificate file or a tls.Certificate, because the retrieving -// these credentials can be done in many ways e.g. plain file, in code tls.Config -// or by certificate rotation, so it is up to the caller to decide what to use. -func WithTLSCredentials(creds credentials.TransportCredentials) ExporterOption { - return func(cfg *config) { - cfg.clientCredentials = creds - } -} - -// WithGRPCServiceConfig defines the default gRPC service config used. -func WithGRPCServiceConfig(serviceConfig string) ExporterOption { - return func(cfg *config) { - cfg.grpcServiceConfig = serviceConfig - } -} - -// WithGRPCDialOption opens support to any grpc.DialOption to be used. If it conflicts -// with some other configuration the GRPC specified via the collector the ones here will -// take preference since they are set last. -func WithGRPCDialOption(opts ...grpc.DialOption) ExporterOption { - return func(cfg *config) { - cfg.grpcDialOptions = opts - } -} - -// WithMetricExportKindSelector defines the ExportKindSelector used for selecting -// AggregationTemporality (i.e., Cumulative vs. Delta aggregation). +// WithMetricExportKindSelector defines the ExportKindSelector used +// for selecting AggregationTemporality (i.e., Cumulative vs. Delta +// aggregation). If not specified otherwise, exporter will use a +// cumulative export kind selector. func WithMetricExportKindSelector(selector metricsdk.ExportKindSelector) ExporterOption { return func(cfg *config) { cfg.exportKindSelector = selector diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index d040781e0..1e90f7a73 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -14,19 +14,11 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" -// This code was based on -// contrib.go.opencensus.io/exporter/ocagent/connection.go - import ( "context" "errors" "sync" - "google.golang.org/grpc" - - colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" - coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" - "go.opentelemetry.io/otel/exporters/otlp/internal/transform" "go.opentelemetry.io/otel/metric" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" @@ -37,30 +29,31 @@ import ( // from OpenTelemetry instrumented to code using OpenTelemetry protocol // buffers to a configurable receiver. type Exporter struct { - // mu protects the non-atomic and non-channel variables - mu sync.RWMutex - // senderMu protects the concurrent unsafe sends on the shared gRPC client connection. - senderMu sync.Mutex - started bool - traceExporter coltracepb.TraceServiceClient - metricExporter colmetricpb.MetricsServiceClient - cc *grpcConnection + cfg config + driver ProtocolDriver + + mu sync.RWMutex + started bool startOnce sync.Once stopOnce sync.Once - - exportKindSelector metricsdk.ExportKindSelector } var _ tracesdk.SpanExporter = (*Exporter)(nil) var _ metricsdk.Exporter = (*Exporter)(nil) -// newConfig initializes a config struct with default values and applies -// any ExporterOptions provided. -func newConfig(opts ...ExporterOption) config { - cfg := config{ - grpcServiceConfig: DefaultGRPCServiceConfig, +// NewExporter constructs a new Exporter and starts it. +func NewExporter(ctx context.Context, driver ProtocolDriver, opts ...ExporterOption) (*Exporter, error) { + exp := NewUnstartedExporter(driver, opts...) + if err := exp.Start(ctx); err != nil { + return nil, err + } + return exp, nil +} +// NewUnstartedExporter constructs a new Exporter and does not start it. +func NewUnstartedExporter(driver ProtocolDriver, opts ...ExporterOption) *Exporter { + cfg := config{ // Note: the default ExportKindSelector is specified // as Cumulative: // https://github.com/open-telemetry/opentelemetry-specification/issues/731 @@ -69,38 +62,10 @@ func newConfig(opts ...ExporterOption) config { for _, opt := range opts { opt(&cfg) } - return cfg -} - -// NewExporter constructs a new Exporter and starts it. -func NewExporter(ctx context.Context, opts ...ExporterOption) (*Exporter, error) { - exp := NewUnstartedExporter(opts...) - if err := exp.Start(ctx); err != nil { - return nil, err + return &Exporter{ + cfg: cfg, + driver: driver, } - return exp, nil -} - -// NewUnstartedExporter constructs a new Exporter and does not start it. -func NewUnstartedExporter(opts ...ExporterOption) *Exporter { - e := new(Exporter) - cfg := newConfig(opts...) - e.exportKindSelector = cfg.exportKindSelector - e.cc = newGRPCConnection(cfg, e.handleNewConnection) - return e -} - -func (e *Exporter) handleNewConnection(cc *grpc.ClientConn) error { - e.mu.Lock() - defer e.mu.Unlock() - if cc != nil { - e.metricExporter = colmetricpb.NewMetricsServiceClient(cc) - e.traceExporter = coltracepb.NewTraceServiceClient(cc) - } else { - e.metricExporter = nil - e.traceExporter = nil - } - return nil } var ( @@ -109,30 +74,26 @@ var ( errDisconnected = errors.New("exporter disconnected") ) -// Start dials to the collector, establishing a connection to it. It also -// initiates the Config and Trace services by sending over the initial -// messages that consist of the node identifier. Start invokes a background -// connector that will reattempt connections to the collector periodically -// if the connection dies. +// Start establishes connections to the OpenTelemetry collector. Starting an +// already started exporter returns an error. func (e *Exporter) Start(ctx context.Context) error { var err = errAlreadyStarted e.startOnce.Do(func() { e.mu.Lock() e.started = true e.mu.Unlock() - - err = nil - e.cc.startConnection(ctx) + err = e.driver.Start(ctx) }) return err } // Shutdown closes all connections and releases resources currently being used -// by the exporter. If the exporter is not started this does nothing. +// by the exporter. If the exporter is not started this does nothing. A shut +// down exporter can't be started again. Shutting down an already shut down +// exporter does nothing. func (e *Exporter) Shutdown(ctx context.Context) error { e.mu.RLock() - cc := e.cc started := e.started e.mu.RUnlock() @@ -143,10 +104,7 @@ func (e *Exporter) Shutdown(ctx context.Context) error { var err error e.stopOnce.Do(func() { - // Clean things up before checking this error. - err = cc.shutdown(ctx) - - // At this point we can change the state variable started + err = e.driver.Stop(ctx) e.mu.Lock() e.started = false e.mu.Unlock() @@ -159,75 +117,19 @@ func (e *Exporter) Shutdown(ctx context.Context) error { // interface. It transforms and batches metric Records into OTLP Metrics and // transmits them to the configured collector. func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error { - ctx, cancel := e.cc.contextWithStop(parent) - defer cancel() - - // Hardcode the number of worker goroutines to 1. We later will - // need to see if there's a way to adjust that number for longer - // running operations. - rms, err := transform.CheckpointSet(ctx, e, cps, 1) - if err != nil { - return err - } - - if !e.cc.connected() { - return errDisconnected - } - - err = func() error { - e.senderMu.Lock() - defer e.senderMu.Unlock() - if e.metricExporter == nil { - return errNoClient - } - _, err := e.metricExporter.Export(e.cc.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ - ResourceMetrics: rms, - }) - return err - }() - if err != nil { - e.cc.setStateDisconnected(err) - } - return err + return e.driver.ExportMetrics(parent, cps, e.cfg.exportKindSelector) } // ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter -// metric telemetry that it needs to be provided in a cumulative format. +// metric telemetry that it needs to be provided in a configured format. func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) metricsdk.ExportKind { - return e.exportKindSelector.ExportKindFor(desc, kind) + return e.cfg.exportKindSelector.ExportKindFor(desc, kind) } -// ExportSpans exports a batch of SpanSnapshot. +// ExportSpans implements the +// "go.opentelemetry.io/otel/sdk/export/trace".SpanExporter interface. It +// transforms and batches trace SpanSnapshots into OTLP Trace and transmits them +// to the configured collector. func (e *Exporter) ExportSpans(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { - return e.uploadTraces(ctx, ss) -} - -func (e *Exporter) uploadTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { - ctx, cancel := e.cc.contextWithStop(ctx) - defer cancel() - - if !e.cc.connected() { - return nil - } - - protoSpans := transform.SpanData(ss) - if len(protoSpans) == 0 { - return nil - } - - err := func() error { - e.senderMu.Lock() - defer e.senderMu.Unlock() - if e.traceExporter == nil { - return errNoClient - } - _, err := e.traceExporter.Export(e.cc.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ - ResourceSpans: protoSpans, - }) - return err - }() - if err != nil { - e.cc.setStateDisconnected(err) - } - return err + return e.driver.ExportTraces(ctx, ss) } diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index c452bcfa1..dab79d72b 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -25,28 +25,53 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp" commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" - metricsdk "go.opentelemetry.io/otel/sdk/export/metric" + exportmetric "go.opentelemetry.io/otel/sdk/export/metric" exporttrace "go.opentelemetry.io/otel/sdk/export/trace" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/controller/push" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/selector/simple" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" ) func TestNewExporter_endToEnd(t *testing.T) { tests := []struct { name string - additionalOpts []otlp.ExporterOption + additionalOpts []otlp.GRPCConnectionOption }{ { name: "StandardExporter", }, + { + name: "WithCompressor", + additionalOpts: []otlp.GRPCConnectionOption{ + otlp.WithCompressor(gzip.Name), + }, + }, + { + name: "WithGRPCServiceConfig", + additionalOpts: []otlp.GRPCConnectionOption{ + otlp.WithGRPCServiceConfig("{}"), + }, + }, + { + name: "WithGRPCDialOptions", + additionalOpts: []otlp.GRPCConnectionOption{ + otlp.WithGRPCDialOption(grpc.WithBlock()), + }, + }, } for _, test := range tests { @@ -56,7 +81,23 @@ func TestNewExporter_endToEnd(t *testing.T) { } } -func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) { +func newGRPCExporter(t *testing.T, ctx context.Context, address string, additionalOpts ...otlp.GRPCConnectionOption) *otlp.Exporter { + opts := []otlp.GRPCConnectionOption{ + otlp.WithInsecure(), + otlp.WithAddress(address), + otlp.WithReconnectionPeriod(50 * time.Millisecond), + } + + opts = append(opts, additionalOpts...) + driver := otlp.NewGRPCDriver(opts...) + exp, err := otlp.NewExporter(ctx, driver) + if err != nil { + t.Fatalf("failed to create a new collector exporter: %v", err) + } + return exp +} + +func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) { mc := runMockColAtAddr(t, "localhost:56561") defer func() { @@ -65,20 +106,10 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) <-time.After(5 * time.Millisecond) - opts := []otlp.ExporterOption{ - otlp.WithInsecure(), - otlp.WithAddress(mc.address), - otlp.WithReconnectionPeriod(50 * time.Millisecond), - } - - opts = append(opts, additionalOpts...) ctx := context.Background() - exp, err := otlp.NewExporter(ctx, opts...) - if err != nil { - t.Fatalf("failed to create a new collector exporter: %v", err) - } + exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...) defer func() { - ctx, cancel := context.WithTimeout(ctx, time.Second) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { panic(err) @@ -121,7 +152,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) } selector := simple.NewWithInexpensiveDistribution() - processor := processor.New(selector, metricsdk.StatelessExportKindSelector()) + processor := processor.New(selector, exportmetric.StatelessExportKindSelector()) pusher := push.New(processor, exp) pusher.Start() @@ -185,12 +216,22 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) // Flush and close. pusher.Stop() + func() { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := tp1.Shutdown(ctx); err != nil { + t.Fatalf("failed to shut down a tracer provider 1: %v", err) + } + if err := tp2.Shutdown(ctx); err != nil { + t.Fatalf("failed to shut down a tracer provider 2: %v", err) + } + }() // Wait >2 cycles. <-time.After(40 * time.Millisecond) // Now shutdown the exporter - ctx, cancel := context.WithTimeout(ctx, time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { t.Fatalf("failed to stop the exporter: %v", err) @@ -308,13 +349,7 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { }() ctx := context.Background() - exp, err := otlp.NewExporter(ctx, - otlp.WithInsecure(), - otlp.WithReconnectionPeriod(50*time.Millisecond), - otlp.WithAddress(mc.address)) - if err != nil { - t.Fatalf("error creating exporter: %v", err) - } + exp := newGRPCExporter(t, ctx, mc.address) defer func() { if err := exp.Shutdown(ctx); err != nil { panic(err) @@ -344,13 +379,8 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { reconnectionPeriod := 20 * time.Millisecond ctx := context.Background() - exp, err := otlp.NewExporter(ctx, - otlp.WithInsecure(), - otlp.WithAddress(mc.address), + exp := newGRPCExporter(t, ctx, mc.address, otlp.WithReconnectionPeriod(reconnectionPeriod)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } defer func() { _ = exp.Shutdown(ctx) }() @@ -417,13 +447,7 @@ func TestNewExporter_collectorOnBadConnection(t *testing.T) { address := fmt.Sprintf("localhost:%s", collectorPortStr) ctx := context.Background() - exp, err := otlp.NewExporter(ctx, - otlp.WithInsecure(), - otlp.WithReconnectionPeriod(50*time.Millisecond), - otlp.WithAddress(address)) - if err != nil { - t.Fatalf("Despite an indefinite background reconnection, got error: %v", err) - } + exp := newGRPCExporter(t, ctx, address) _ = exp.Shutdown(ctx) } @@ -433,19 +457,9 @@ func TestNewExporter_withAddress(t *testing.T) { _ = mc.stop() }() - exp := otlp.NewUnstartedExporter( - otlp.WithInsecure(), - otlp.WithReconnectionPeriod(50*time.Millisecond), - otlp.WithAddress(mc.address)) - ctx := context.Background() - defer func() { - _ = exp.Shutdown(ctx) - }() - - if err := exp.Start(ctx); err != nil { - t.Fatalf("Unexpected Start error: %v", err) - } + exp := newGRPCExporter(t, ctx, mc.address) + _ = exp.Shutdown(ctx) } func TestNewExporter_withHeaders(t *testing.T) { @@ -455,12 +469,8 @@ func TestNewExporter_withHeaders(t *testing.T) { }() ctx := context.Background() - exp, _ := otlp.NewExporter(ctx, - otlp.WithInsecure(), - otlp.WithReconnectionPeriod(50*time.Millisecond), - otlp.WithAddress(mc.address), - otlp.WithHeaders(map[string]string{"header1": "value1"}), - ) + exp := newGRPCExporter(t, ctx, mc.address, + otlp.WithHeaders(map[string]string{"header1": "value1"})) require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}})) defer func() { @@ -482,11 +492,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { <-time.After(5 * time.Millisecond) ctx := context.Background() - exp, _ := otlp.NewExporter(ctx, - otlp.WithInsecure(), - otlp.WithReconnectionPeriod(50*time.Millisecond), - otlp.WithAddress(mc.address), - ) + exp := newGRPCExporter(t, ctx, mc.address) defer func() { _ = exp.Shutdown(ctx) @@ -517,19 +523,20 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { span.SetAttributes(testKvs...) span.End() - selector := simple.NewWithInexpensiveDistribution() - processor := processor.New(selector, metricsdk.StatelessExportKindSelector()) - pusher := push.New(processor, exp) - pusher.Start() - // Flush and close. - pusher.Stop() + func() { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := tp.Shutdown(ctx); err != nil { + t.Fatalf("failed to shut down a tracer provider: %v", err) + } + }() // Wait >2 cycles. <-time.After(40 * time.Millisecond) // Now shutdown the exporter - ctx, cancel := context.WithTimeout(ctx, time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { t.Fatalf("failed to stop the exporter: %v", err) @@ -623,3 +630,134 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { assert.Equal(t, expected[i], actual) } } + +type discCheckpointSet struct{} + +func (discCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { + desc := metric.NewDescriptor( + "foo", + metric.CounterInstrumentKind, + number.Int64Kind, + ) + res := resource.NewWithAttributes(label.String("a", "b")) + agg := sum.New(1) + start := time.Now().Add(-20 * time.Minute) + end := time.Now() + labels := label.NewSet() + rec := exportmetric.NewRecord(&desc, &labels, res, agg[0].Aggregation(), start, end) + return recordFunc(rec) +} + +func (discCheckpointSet) Lock() {} +func (discCheckpointSet) Unlock() {} +func (discCheckpointSet) RLock() {} +func (discCheckpointSet) RUnlock() {} + +func discSpanSnapshot() *exporttrace.SpanSnapshot { + return &exporttrace.SpanSnapshot{ + SpanContext: trace.SpanContext{ + TraceID: trace.TraceID{2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9}, + SpanID: trace.SpanID{3, 4, 5, 6, 7, 8, 9, 0}, + TraceFlags: trace.FlagsSampled, + }, + ParentSpanID: trace.SpanID{1, 2, 3, 4, 5, 6, 7, 8}, + SpanKind: trace.SpanKindInternal, + Name: "foo", + StartTime: time.Now().Add(-20 * time.Minute), + EndTime: time.Now(), + Attributes: []label.KeyValue{}, + MessageEvents: []exporttrace.Event{}, + Links: []trace.Link{}, + StatusCode: codes.Ok, + StatusMessage: "", + HasRemoteParent: false, + DroppedAttributeCount: 0, + DroppedMessageEventCount: 0, + DroppedLinkCount: 0, + ChildSpanCount: 0, + Resource: resource.NewWithAttributes(label.String("a", "b")), + InstrumentationLibrary: instrumentation.Library{ + Name: "bar", + Version: "0.0.0", + }, + } +} + +func TestDisconnected(t *testing.T) { + ctx := context.Background() + // The address is whatever, we want to be disconnected. But we + // setting a blocking connection, so dialing to the invalid + // address actually fails. + exp := newGRPCExporter(t, ctx, "invalid", + otlp.WithReconnectionPeriod(time.Hour), + otlp.WithGRPCDialOption( + grpc.WithBlock(), + grpc.FailOnNonTempDialError(true), + ), + ) + defer func() { + assert.NoError(t, exp.Shutdown(ctx)) + }() + + assert.Error(t, exp.Export(ctx, discCheckpointSet{})) + assert.Error(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{discSpanSnapshot()})) +} + +type emptyCheckpointSet struct{} + +func (emptyCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { + return nil +} + +func (emptyCheckpointSet) Lock() {} +func (emptyCheckpointSet) Unlock() {} +func (emptyCheckpointSet) RLock() {} +func (emptyCheckpointSet) RUnlock() {} + +func TestEmptyData(t *testing.T) { + mc := runMockColAtAddr(t, "localhost:56561") + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.address) + defer func() { + assert.NoError(t, exp.Shutdown(ctx)) + }() + + assert.NoError(t, exp.ExportSpans(ctx, nil)) + assert.NoError(t, exp.Export(ctx, emptyCheckpointSet{})) +} + +type failCheckpointSet struct{} + +func (failCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { + return fmt.Errorf("fail") +} + +func (failCheckpointSet) Lock() {} +func (failCheckpointSet) Unlock() {} +func (failCheckpointSet) RLock() {} +func (failCheckpointSet) RUnlock() {} + +func TestFailedMetricTransform(t *testing.T) { + mc := runMockColAtAddr(t, "localhost:56561") + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.address) + defer func() { + assert.NoError(t, exp.Shutdown(ctx)) + }() + + assert.Error(t, exp.Export(ctx, failCheckpointSet{})) +} diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index 120b88ad8..660d17ece 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" @@ -36,8 +35,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/resource" - - "google.golang.org/grpc" ) var ( @@ -55,28 +52,6 @@ func pointTime() uint64 { return uint64(intervalEnd.UnixNano()) } -type metricsServiceClientStub struct { - rm []metricpb.ResourceMetrics -} - -func (m *metricsServiceClientStub) Export(ctx context.Context, in *colmetricpb.ExportMetricsServiceRequest, opts ...grpc.CallOption) (*colmetricpb.ExportMetricsServiceResponse, error) { - for _, rm := range in.GetResourceMetrics() { - if rm == nil { - continue - } - m.rm = append(m.rm, *rm) - } - return &colmetricpb.ExportMetricsServiceResponse{}, nil -} - -func (m *metricsServiceClientStub) ResourceMetrics() []metricpb.ResourceMetrics { - return m.rm -} - -func (m *metricsServiceClientStub) Reset() { - m.rm = nil -} - type checkpointSet struct { sync.RWMutex records []metricsdk.Record @@ -765,12 +740,8 @@ func TestStatelessExportKind(t *testing.T) { } } -// What works single-threaded should work multi-threaded func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { - exp := NewUnstartedExporter(opts...) - msc := &metricsServiceClientStub{} - exp.metricExporter = msc - exp.started = true + exp, driver := newExporter(t, opts...) recs := map[label.Distinct][]metricsdk.Record{} resources := map[label.Distinct]*resource.Resource{} @@ -830,7 +801,7 @@ func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expe resource, instrumentationLibrary string } got := map[key][]*metricpb.Metric{} - for _, rm := range msc.ResourceMetrics() { + for _, rm := range driver.rm { for _, ilm := range rm.InstrumentationLibraryMetrics { k := key{ resource: rm.GetResource().String(), @@ -910,10 +881,7 @@ func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expe } func TestEmptyMetricExport(t *testing.T) { - msc := &metricsServiceClientStub{} - exp := NewUnstartedExporter() - exp.metricExporter = msc - exp.started = true + exp, driver := newExporter(t) for _, test := range []struct { records []metricsdk.Record @@ -928,8 +896,8 @@ func TestEmptyMetricExport(t *testing.T) { []metricpb.ResourceMetrics(nil), }, } { - msc.Reset() + driver.Reset() require.NoError(t, exp.Export(context.Background(), &checkpointSet{records: test.records})) - assert.Equal(t, test.want, msc.ResourceMetrics()) + assert.Equal(t, test.want, driver.rm) } } diff --git a/exporters/otlp/otlp_span_test.go b/exporters/otlp/otlp_span_test.go index e967e507d..c2c6aa6c9 100644 --- a/exporters/otlp/otlp_span_test.go +++ b/exporters/otlp/otlp_span_test.go @@ -20,10 +20,8 @@ import ( "time" "github.com/stretchr/testify/assert" - "google.golang.org/grpc" "go.opentelemetry.io/otel/codes" - coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" @@ -35,33 +33,8 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) -type traceServiceClientStub struct { - rs []tracepb.ResourceSpans -} - -func (t *traceServiceClientStub) Export(ctx context.Context, in *coltracepb.ExportTraceServiceRequest, opts ...grpc.CallOption) (*coltracepb.ExportTraceServiceResponse, error) { - for _, rs := range in.GetResourceSpans() { - if rs == nil { - continue - } - t.rs = append(t.rs, *rs) - } - return &coltracepb.ExportTraceServiceResponse{}, nil -} - -func (t *traceServiceClientStub) ResourceSpans() []tracepb.ResourceSpans { - return t.rs -} - -func (t *traceServiceClientStub) Reset() { - t.rs = nil -} - func TestExportSpans(t *testing.T) { - tsc := &traceServiceClientStub{} - exp := NewUnstartedExporter() - exp.traceExporter = tsc - exp.started = true + exp, driver := newExporter(t) // March 31, 2020 5:01:26 1234nanos (UTC) startTime := time.Unix(1585674086, 1234) @@ -352,8 +325,8 @@ func TestExportSpans(t *testing.T) { }, }, } { - tsc.Reset() + driver.Reset() assert.NoError(t, exp.ExportSpans(context.Background(), test.sd)) - assert.ElementsMatch(t, test.want, tsc.ResourceSpans()) + assert.ElementsMatch(t, test.want, driver.rs) } } diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index afcb28ddf..1a10f5e28 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -20,25 +20,89 @@ import ( "sync" "testing" "time" + + "github.com/stretchr/testify/require" + + metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" + tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/otel/exporters/otlp/internal/transform" + + metricsdk "go.opentelemetry.io/otel/sdk/export/metric" + tracesdk "go.opentelemetry.io/otel/sdk/export/trace" ) +type stubProtocolDriver struct { + rm []metricpb.ResourceMetrics + rs []tracepb.ResourceSpans +} + +var _ ProtocolDriver = (*stubProtocolDriver)(nil) + +func (m *stubProtocolDriver) Start(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } +} + +func (m *stubProtocolDriver) Stop(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } +} + +func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { + rms, err := transform.CheckpointSet(parent, selector, cps, 1) + if err != nil { + return err + } + for _, rm := range rms { + if rm == nil { + continue + } + m.rm = append(m.rm, *rm) + } + return nil +} + +func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { + for _, rs := range transform.SpanData(ss) { + if rs == nil { + continue + } + m.rs = append(m.rs, *rs) + } + return nil +} + +func (m *stubProtocolDriver) Reset() { + m.rm = nil + m.rs = nil +} + +func newExporter(t *testing.T, opts ...ExporterOption) (*Exporter, *stubProtocolDriver) { + driver := &stubProtocolDriver{} + exp, err := NewExporter(context.Background(), driver, opts...) + require.NoError(t, err) + return exp, driver +} + func TestExporterShutdownHonorsTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter() - orig := e.cc.closeBackgroundConnectionDoneCh - e.cc.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { - go func() { - <-ctx.Done() - orig(ch) - }() - } + e := NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } innerCtx, innerCancel := context.WithTimeout(ctx, time.Microsecond) + <-time.After(time.Second) if err := e.Shutdown(innerCtx); err == nil { t.Error("expected context DeadlineExceeded error, got nil") } else if !errors.Is(err, context.DeadlineExceeded) { @@ -51,14 +115,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter() - orig := e.cc.closeBackgroundConnectionDoneCh - e.cc.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { - go func() { - <-ctx.Done() - orig(ch) - }() - } + e := NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -77,7 +134,7 @@ func TestExporterShutdownNoError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter() + e := NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -89,7 +146,7 @@ func TestExporterShutdownNoError(t *testing.T) { func TestExporterShutdownManyTimes(t *testing.T) { ctx := context.Background() - e, err := NewExporter(ctx) + e, err := NewExporter(ctx, &stubProtocolDriver{}) if err != nil { t.Fatalf("failed to start an exporter: %v", err) } diff --git a/exporters/otlp/protocoldriver.go b/exporters/otlp/protocoldriver.go new file mode 100644 index 000000000..c4992ba1e --- /dev/null +++ b/exporters/otlp/protocoldriver.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp // import "go.opentelemetry.io/otel/exporters/otlp" + +import ( + "context" + + metricsdk "go.opentelemetry.io/otel/sdk/export/metric" + tracesdk "go.opentelemetry.io/otel/sdk/export/trace" +) + +// ProtocolDriver is an interface used by OTLP exporter. It's +// responsible for connecting to and disconnecting from the collector, +// and for transforming traces and metrics into wire format and +// transmitting them to the collector. +type ProtocolDriver interface { + // Start should establish connection(s) to endpoint(s). It is + // called just once by the exporter, so the implementation + // does not need to worry about idempotence and locking. + Start(ctx context.Context) error + // Stop should close the connections. The function is called + // only once by the exporter, so the implementation does not + // need to worry about idempotence, but it may be called + // concurrently with ExportMetrics or ExportTraces, so proper + // locking is required. The function serves as a + // synchronization point - after the function returns, the + // process of closing connections is assumed to be finished. + Stop(ctx context.Context) error + // ExportMetrics should transform the passed metrics to the + // wire format and send it to the collector. May be called + // concurrently with ExportTraces, so the manager needs to + // take this into account by doing proper locking. + ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error + // ExportTraces should transform the passed traces to the wire + // format and send it to the collector. May be called + // concurrently with ExportMetrics, so the manager needs to + // take this into account by doing proper locking. + ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error +}