1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-09 13:37:12 +02:00

Another batch of cleanups in otlp exporter (#1357)

* Move connection logic into grpcConnection object

If we will need to maintain more than one connection in future, this
splitting off will come in handy.

Co-authored-by: Stefan Prisca <stefan.prisca@gmail.com>

* Make another channel a signal channel

There is another channel that serves as a one-time signal, where
channel's data type does not matter.

* Reorder and document connection members

This is to make clear that the lock is guarding only the connection
since it can be changed by multiple goroutines, and other members are
either atomic or read-only.

* Move stop signal into connection

The stop channel was rather useless on the exporter side - the primary
reason for existence of this channel is to stop a background
reconnecting goroutine. Since the goroutine lives entirely within
grpcConnection object, move the stop channel here. Also expose a
function to unify the stop channel with the context cancellation, so
exporter can use it without knowing anything about stop channels.

Also make export functions a bit more consistent.

* Do not run reconnection routine when being stopped too

It's possible that both disconnected channel and stop channel will be
triggered around the same time, so the goroutine is as likely to start
reconnecting as to return from the goroutine. Make sure we return if
the stop channel is closed.

* Nil clients on connection error

Set clients to nil on connection error, so we don't try to send the
data over a bad connection, but return a "no client" error
immediately.

* Do not call new connection handler within critical section

It's rather risky to call a callback coming from outside within a
critical section. Move it out.

* Add context parameter to connection routines

Connecting to the collector may also take its time, so it can be
useful in some cases to pass a context with a deadline. Currently we
just pass a background context, so this commit does not really change
any behavior. The follow-up commits will make a use of it, though.

* Add context parameter to NewExporter and Start

It makes it possible to limit the time spent on connecting to the
collector.

* Stop connecting on shutdown

Dialling to grpc service ignored the closing of the stop channel, but
this can be easily changed.

* Close connection after background is shut down

That way we can make sure that there won't be a window between closing
a connection and waiting for the background goroutine to return, where
the new connection could be established.

* Remove unnecessary nil check

This member is never nil, unless the Exporter is created like
&Exporter{}, which is not a thing we support anyway.

* Update changelog

Co-authored-by: Stefan Prisca <stefan.prisca@gmail.com>
This commit is contained in:
Krzesimir Nowak 2020-11-24 20:50:05 +01:00 committed by GitHub
parent e0819780f9
commit 5a728db2e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 322 additions and 251 deletions

View File

@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed
- Move the OpenCensus example into `example` directory. (#1359)
- `NewExporter` and `Start` functions in `go.opentelemetry.io/otel/exporters/otlp` now receive `context.Context` as a first parameter. (#1357)
## [0.14.0] - 2020-11-19

View File

@ -49,7 +49,7 @@ func initProvider() func() {
// `localhost:30080` address. Otherwise, replace `localhost` with the
// address of your cluster. If you run the app inside k8s, then you can
// probably connect directly to the service through dns
exp, err := otlp.NewExporter(
exp, err := otlp.NewExporter(ctx,
otlp.WithInsecure(),
otlp.WithAddress("localhost:30080"),
otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing

View File

@ -26,8 +26,8 @@ import (
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "Exporter.lastConnectErrPtr",
Offset: unsafe.Offsetof(Exporter{}.lastConnectErrPtr),
Name: "grpcConnection.lastConnectErrPtr",
Offset: unsafe.Offsetof(grpcConnection{}.lastConnectErrPtr),
},
}
if !ottest.Aligned8Byte(fields, os.Stderr) {

View File

@ -15,52 +15,113 @@
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func (e *Exporter) lastConnectError() error {
errPtr := (*error)(atomic.LoadPointer(&e.lastConnectErrPtr))
type grpcConnection struct {
// Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines.
lastConnectErrPtr unsafe.Pointer
// mu protects the connection as it is accessed by the
// exporter goroutines and background connection goroutine
mu sync.Mutex
cc *grpc.ClientConn
// these fields are read-only after constructor is finished
c config
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn) error
// these channels are created once
disconnectedCh chan bool
backgroundConnectionDoneCh chan struct{}
stopCh chan struct{}
// this is for tests, so they can replace the closing
// routine without a worry of modifying some global variable
// or changing it back to original after the test is done
closeBackgroundConnectionDoneCh func(ch chan struct{})
}
func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcConnection {
conn := new(grpcConnection)
conn.newConnectionHandler = handler
if c.collectorAddr == "" {
c.collectorAddr = fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort)
}
conn.c = c
if len(conn.c.headers) > 0 {
conn.metadata = metadata.New(conn.c.headers)
}
conn.closeBackgroundConnectionDoneCh = func(ch chan struct{}) {
close(ch)
}
return conn
}
func (oc *grpcConnection) startConnection(ctx context.Context) {
oc.stopCh = make(chan struct{})
oc.disconnectedCh = make(chan bool)
oc.backgroundConnectionDoneCh = make(chan struct{})
if err := oc.connect(ctx); err == nil {
oc.setStateConnected()
} else {
oc.setStateDisconnected(err)
}
go oc.indefiniteBackgroundConnection()
}
func (oc *grpcConnection) lastConnectError() error {
errPtr := (*error)(atomic.LoadPointer(&oc.lastConnectErrPtr))
if errPtr == nil {
return nil
}
return *errPtr
}
func (e *Exporter) saveLastConnectError(err error) {
func (oc *grpcConnection) saveLastConnectError(err error) {
var errPtr *error
if err != nil {
errPtr = &err
}
atomic.StorePointer(&e.lastConnectErrPtr, unsafe.Pointer(errPtr))
atomic.StorePointer(&oc.lastConnectErrPtr, unsafe.Pointer(errPtr))
}
func (e *Exporter) setStateDisconnected(err error) {
e.saveLastConnectError(err)
func (oc *grpcConnection) setStateDisconnected(err error) {
oc.saveLastConnectError(err)
select {
case e.disconnectedCh <- true:
case oc.disconnectedCh <- true:
default:
}
_ = oc.newConnectionHandler(nil)
}
func (e *Exporter) setStateConnected() {
e.saveLastConnectError(nil)
func (oc *grpcConnection) setStateConnected() {
oc.saveLastConnectError(nil)
}
func (e *Exporter) connected() bool {
return e.lastConnectError() == nil
func (oc *grpcConnection) connected() bool {
return oc.lastConnectError() == nil
}
const defaultConnReattemptPeriod = 10 * time.Second
func (e *Exporter) indefiniteBackgroundConnection() {
func (oc *grpcConnection) indefiniteBackgroundConnection() {
defer func() {
e.backgroundConnectionDoneCh <- true
oc.closeBackgroundConnectionDoneCh(oc.backgroundConnectionDoneCh)
}()
connReattemptPeriod := e.c.reconnectionPeriod
connReattemptPeriod := oc.c.reconnectionPeriod
if connReattemptPeriod <= 0 {
connReattemptPeriod = defaultConnReattemptPeriod
}
@ -79,17 +140,26 @@ func (e *Exporter) indefiniteBackgroundConnection() {
// 2. Otherwise block until we are disconnected, and
// then retry connecting
select {
case <-e.stopCh:
case <-oc.stopCh:
return
case <-e.disconnectedCh:
case <-oc.disconnectedCh:
// Quickly check if we haven't stopped at the
// same time.
select {
case <-oc.stopCh:
return
default:
}
// Normal scenario that we'll wait for
}
if err := e.connect(); err == nil {
e.setStateConnected()
if err := oc.connect(context.Background()); err == nil {
oc.setStateConnected()
} else {
e.setStateDisconnected(err)
oc.setStateDisconnected(err)
}
// Apply some jitter to avoid lockstep retrials of other
@ -97,17 +167,110 @@ func (e *Exporter) indefiniteBackgroundConnection() {
// innocent DDOS, by clogging the machine's resources and network.
jitter := time.Duration(rng.Int63n(maxJitterNanos))
select {
case <-e.stopCh:
case <-oc.stopCh:
return
case <-time.After(connReattemptPeriod + jitter):
}
}
}
func (e *Exporter) connect() error {
cc, err := e.dialToCollector()
func (oc *grpcConnection) connect(ctx context.Context) error {
cc, err := oc.dialToCollector(ctx)
if err != nil {
return err
}
return e.enableConnections(cc)
oc.setConnection(cc)
return oc.newConnectionHandler(cc)
}
// setConnection sets cc as the client connection and returns true if
// the connection state changed.
func (oc *grpcConnection) setConnection(cc *grpc.ClientConn) bool {
oc.mu.Lock()
defer oc.mu.Unlock()
// If previous clientConn is same as the current then just return.
// This doesn't happen right now as this func is only called with new ClientConn.
// It is more about future-proofing.
if oc.cc == cc {
return false
}
// If the previous clientConn was non-nil, close it
if oc.cc != nil {
_ = oc.cc.Close()
}
oc.cc = cc
return true
}
func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) {
addr := oc.c.collectorAddr
dialOpts := []grpc.DialOption{}
if oc.c.grpcServiceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(oc.c.grpcServiceConfig))
}
if oc.c.clientCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(oc.c.clientCredentials))
} else if oc.c.canDialInsecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if oc.c.compressor != "" {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(oc.c.compressor)))
}
if len(oc.c.grpcDialOptions) != 0 {
dialOpts = append(dialOpts, oc.c.grpcDialOptions...)
}
ctx, cancel := oc.contextWithStop(ctx)
defer cancel()
ctx = oc.contextWithMetadata(ctx)
return grpc.DialContext(ctx, addr, dialOpts...)
}
func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Context {
if oc.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, oc.metadata)
}
return ctx
}
func (oc *grpcConnection) shutdown(ctx context.Context) error {
close(oc.stopCh)
// Ensure that the backgroundConnector returns
select {
case <-oc.backgroundConnectionDoneCh:
case <-ctx.Done():
return ctx.Err()
}
close(oc.disconnectedCh)
oc.mu.Lock()
cc := oc.cc
oc.cc = nil
oc.mu.Unlock()
if cc != nil {
return cc.Close()
}
return nil
}
func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the connection's
// stop channel.
ctx, cancel := context.WithCancel(ctx)
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
// Nothing to do, either cancelled or deadline
// happened.
case <-oc.stopCh:
cancel()
}
}(ctx, cancel)
return ctx, cancel
}

View File

@ -28,12 +28,13 @@ import (
)
func Example_insecure() {
exp, err := otlp.NewExporter(otlp.WithInsecure())
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, otlp.WithInsecure())
if err != nil {
log.Fatalf("Failed to create the collector exporter: %v", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
otel.Handle(err)
@ -54,7 +55,7 @@ func Example_insecure() {
tracer := otel.Tracer("test-tracer")
// Then use the OpenTelemetry tracing library, like we normally would.
ctx, span := tracer.Start(context.Background(), "CollectorExporter-Example")
ctx, span := tracer.Start(ctx, "CollectorExporter-Example")
defer span.End()
for i := 0; i < 10; i++ {
@ -72,12 +73,13 @@ func Example_withTLS() {
log.Fatalf("failed to create gRPC client TLS credentials: %v", err)
}
exp, err := otlp.NewExporter(otlp.WithTLSCredentials(creds))
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, otlp.WithTLSCredentials(creds))
if err != nil {
log.Fatalf("failed to create the collector exporter: %v", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
otel.Handle(err)
@ -98,7 +100,7 @@ func Example_withTLS() {
tracer := otel.Tracer("test-tracer")
// Then use the OpenTelemetry tracing library, like we normally would.
ctx, span := tracer.Start(context.Background(), "Securely-Talking-To-Collector-Span")
ctx, span := tracer.Start(ctx, "Securely-Talking-To-Collector-Span")
defer span.End()
for i := 0; i < 10; i++ {

View File

@ -20,12 +20,9 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
import (
"context"
"errors"
"fmt"
"sync"
"unsafe"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
@ -43,22 +40,16 @@ type Exporter struct {
// mu protects the non-atomic and non-channel variables
mu sync.RWMutex
// senderMu protects the concurrent unsafe sends on the shared gRPC client connection.
senderMu sync.Mutex
started bool
traceExporter coltracepb.TraceServiceClient
metricExporter colmetricpb.MetricsServiceClient
grpcClientConn *grpc.ClientConn
lastConnectErrPtr unsafe.Pointer
senderMu sync.Mutex
started bool
traceExporter coltracepb.TraceServiceClient
metricExporter colmetricpb.MetricsServiceClient
cc *grpcConnection
startOnce sync.Once
stopOnce sync.Once
stopCh chan struct{}
disconnectedCh chan bool
startOnce sync.Once
stopOnce sync.Once
backgroundConnectionDoneCh chan bool
c config
metadata metadata.MD
exportKindSelector metricsdk.ExportKindSelector
}
var _ tracesdk.SpanExporter = (*Exporter)(nil)
@ -82,9 +73,9 @@ func newConfig(opts ...ExporterOption) config {
}
// NewExporter constructs a new Exporter and starts it.
func NewExporter(opts ...ExporterOption) (*Exporter, error) {
func NewExporter(ctx context.Context, opts ...ExporterOption) (*Exporter, error) {
exp := NewUnstartedExporter(opts...)
if err := exp.Start(); err != nil {
if err := exp.Start(ctx); err != nil {
return nil, err
}
return exp, nil
@ -93,19 +84,29 @@ func NewExporter(opts ...ExporterOption) (*Exporter, error) {
// NewUnstartedExporter constructs a new Exporter and does not start it.
func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
e := new(Exporter)
e.c = newConfig(opts...)
if len(e.c.headers) > 0 {
e.metadata = metadata.New(e.c.headers)
}
cfg := newConfig(opts...)
e.exportKindSelector = cfg.exportKindSelector
e.cc = newGRPCConnection(cfg, e.handleNewConnection)
return e
}
func (e *Exporter) handleNewConnection(cc *grpc.ClientConn) error {
e.mu.Lock()
defer e.mu.Unlock()
if cc != nil {
e.metricExporter = colmetricpb.NewMetricsServiceClient(cc)
e.traceExporter = coltracepb.NewTraceServiceClient(cc)
} else {
e.metricExporter = nil
e.traceExporter = nil
}
return nil
}
var (
errAlreadyStarted = errors.New("already started")
errNotStarted = errors.New("not started")
errDisconnected = errors.New("exporter disconnected")
errStopped = errors.New("exporter stopped")
errContextCanceled = errors.New("context canceled")
errNoClient = errors.New("no client")
errAlreadyStarted = errors.New("already started")
errDisconnected = errors.New("exporter disconnected")
)
// Start dials to the collector, establishing a connection to it. It also
@ -113,108 +114,25 @@ var (
// messages that consist of the node identifier. Start invokes a background
// connector that will reattempt connections to the collector periodically
// if the connection dies.
func (e *Exporter) Start() error {
func (e *Exporter) Start(ctx context.Context) error {
var err = errAlreadyStarted
e.startOnce.Do(func() {
e.mu.Lock()
e.started = true
e.disconnectedCh = make(chan bool, 1)
e.stopCh = make(chan struct{})
e.backgroundConnectionDoneCh = make(chan bool)
e.mu.Unlock()
// An optimistic first connection attempt to ensure that
// applications under heavy load can immediately process
// data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63
if err := e.connect(); err == nil {
e.setStateConnected()
} else {
e.setStateDisconnected(err)
}
go e.indefiniteBackgroundConnection()
err = nil
e.cc.startConnection(ctx)
})
return err
}
func (e *Exporter) prepareCollectorAddress() string {
if e.c.collectorAddr != "" {
return e.c.collectorAddr
}
return fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort)
}
func (e *Exporter) enableConnections(cc *grpc.ClientConn) error {
e.mu.RLock()
started := e.started
e.mu.RUnlock()
if !started {
return errNotStarted
}
e.mu.Lock()
// If previous clientConn is same as the current then just return.
// This doesn't happen right now as this func is only called with new ClientConn.
// It is more about future-proofing.
if e.grpcClientConn == cc {
e.mu.Unlock()
return nil
}
// If the previous clientConn was non-nil, close it
if e.grpcClientConn != nil {
_ = e.grpcClientConn.Close()
}
e.grpcClientConn = cc
e.traceExporter = coltracepb.NewTraceServiceClient(cc)
e.metricExporter = colmetricpb.NewMetricsServiceClient(cc)
e.mu.Unlock()
return nil
}
func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context {
if e.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, e.metadata)
}
return ctx
}
func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
addr := e.prepareCollectorAddress()
dialOpts := []grpc.DialOption{}
if e.c.grpcServiceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(e.c.grpcServiceConfig))
}
if e.c.clientCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(e.c.clientCredentials))
} else if e.c.canDialInsecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if e.c.compressor != "" {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(e.c.compressor)))
}
if len(e.c.grpcDialOptions) != 0 {
dialOpts = append(dialOpts, e.c.grpcDialOptions...)
}
ctx := e.contextWithMetadata(context.Background())
return grpc.DialContext(ctx, addr, dialOpts...)
}
// closeStopCh is used to wrap the exporters stopCh channel closing for testing.
var closeStopCh = func(stopCh chan struct{}) {
close(stopCh)
}
// Shutdown closes all connections and releases resources currently being used
// by the exporter. If the exporter is not started this does nothing.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.mu.RLock()
cc := e.grpcClientConn
cc := e.cc
started := e.started
e.mu.RUnlock()
@ -225,23 +143,13 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
var err error
e.stopOnce.Do(func() {
if cc != nil {
// Clean things up before checking this error.
err = cc.Close()
}
// Clean things up before checking this error.
err = cc.shutdown(ctx)
// At this point we can change the state variable started
e.mu.Lock()
e.started = false
e.mu.Unlock()
closeStopCh(e.stopCh)
// Ensure that the backgroundConnector returns
select {
case <-e.backgroundConnectionDoneCh:
case <-ctx.Done():
err = ctx.Err()
}
})
return err
@ -251,16 +159,8 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
// interface. It transforms and batches metric Records into OTLP Metrics and
// transmits them to the configured collector.
func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
// Unify the parent context Done signal with the exporter stopCh.
ctx, cancel := context.WithCancel(parent)
ctx, cancel := e.cc.contextWithStop(parent)
defer cancel()
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
case <-e.stopCh:
cancel()
}
}(ctx, cancel)
// Hardcode the number of worker goroutines to 1. We later will
// need to see if there's a way to adjust that number for longer
@ -270,32 +170,31 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
return err
}
if !e.connected() {
if !e.cc.connected() {
return errDisconnected
}
select {
case <-e.stopCh:
return errStopped
case <-ctx.Done():
return errContextCanceled
default:
err = func() error {
e.senderMu.Lock()
_, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{
defer e.senderMu.Unlock()
if e.metricExporter == nil {
return errNoClient
}
_, err := e.metricExporter.Export(e.cc.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: rms,
})
e.senderMu.Unlock()
if err != nil {
return err
}
return err
}()
if err != nil {
e.cc.setStateDisconnected(err)
}
return nil
return err
}
// ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter
// metric telemetry that it needs to be provided in a cumulative format.
func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) metricsdk.ExportKind {
return e.c.exportKindSelector.ExportKindFor(desc, kind)
return e.exportKindSelector.ExportKindFor(desc, kind)
}
// ExportSpans exports a batch of SpanData.
@ -304,28 +203,31 @@ func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) er
}
func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) error {
select {
case <-e.stopCh:
ctx, cancel := e.cc.contextWithStop(ctx)
defer cancel()
if !e.cc.connected() {
return nil
default:
if !e.connected() {
return nil
}
}
protoSpans := transform.SpanData(sdl)
if len(protoSpans) == 0 {
return nil
}
protoSpans := transform.SpanData(sdl)
if len(protoSpans) == 0 {
return nil
}
err := func() error {
e.senderMu.Lock()
_, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{
defer e.senderMu.Unlock()
if e.traceExporter == nil {
return errNoClient
}
_, err := e.traceExporter.Export(e.cc.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
})
e.senderMu.Unlock()
if err != nil {
e.setStateDisconnected(err)
return err
}
return err
}()
if err != nil {
e.cc.setStateDisconnected(err)
}
return nil
return err
}

View File

@ -72,12 +72,13 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
}
opts = append(opts, additionalOpts...)
exp, err := otlp.NewExporter(opts...)
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, opts...)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
@ -110,11 +111,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
// Now create few spans
m := 4
for i := 0; i < m; i++ {
_, span := tr1.Start(context.Background(), "AlwaysSample")
_, span := tr1.Start(ctx, "AlwaysSample")
span.SetAttributes(label.Int64("i", int64(i)))
span.End()
_, span = tr2.Start(context.Background(), "AlwaysSample")
_, span = tr2.Start(ctx, "AlwaysSample")
span.SetAttributes(label.Int64("i", int64(i)))
span.End()
}
@ -124,7 +125,6 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
pusher := push.New(processor, exp)
pusher.Start()
ctx := context.Background()
meter := pusher.MeterProvider().Meter("test-meter")
labels := []label.KeyValue{label.Bool("test", true)}
@ -190,7 +190,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
ctx, cancel := context.WithTimeout(ctx, time.Millisecond)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err)
@ -307,31 +307,33 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
_ = mc.stop()
}()
exp, err := otlp.NewExporter(otlp.WithInsecure(),
ctx := context.Background()
exp, err := otlp.NewExporter(ctx,
otlp.WithInsecure(),
otlp.WithReconnectionPeriod(50*time.Millisecond),
otlp.WithAddress(mc.address))
if err != nil {
t.Fatalf("error creating exporter: %v", err)
}
defer func() {
if err := exp.Shutdown(context.Background()); err != nil {
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()
// Invoke Start numerous times, should return errAlreadyStarted
for i := 0; i < 10; i++ {
if err := exp.Start(); err == nil || !strings.Contains(err.Error(), "already started") {
if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") {
t.Fatalf("#%d unexpected Start error: %v", i, err)
}
}
if err := exp.Shutdown(context.Background()); err != nil {
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to Shutdown the exporter: %v", err)
}
// Invoke Shutdown numerous times
for i := 0; i < 10; i++ {
if err := exp.Shutdown(context.Background()); err != nil {
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf(`#%d got error (%v) expected none`, i, err)
}
}
@ -341,14 +343,16 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCol(t)
reconnectionPeriod := 20 * time.Millisecond
exp, err := otlp.NewExporter(otlp.WithInsecure(),
ctx := context.Background()
exp, err := otlp.NewExporter(ctx,
otlp.WithInsecure(),
otlp.WithAddress(mc.address),
otlp.WithReconnectionPeriod(reconnectionPeriod))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer func() {
_ = exp.Shutdown(context.Background())
_ = exp.Shutdown(ctx)
}()
// We'll now stop the collector right away to simulate a connection
@ -363,7 +367,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
// No endpoint up.
require.Error(
t,
exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}),
exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.address,
)
@ -377,7 +381,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
n := 10
for i := 0; i < n; i++ {
require.NoError(t, exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "Resurrected"}}))
require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "Resurrected"}}))
}
nmaSpans := nmc.getSpans()
@ -412,13 +416,15 @@ func TestNewExporter_collectorOnBadConnection(t *testing.T) {
_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
address := fmt.Sprintf("localhost:%s", collectorPortStr)
exp, err := otlp.NewExporter(otlp.WithInsecure(),
ctx := context.Background()
exp, err := otlp.NewExporter(ctx,
otlp.WithInsecure(),
otlp.WithReconnectionPeriod(50*time.Millisecond),
otlp.WithAddress(address))
if err != nil {
t.Fatalf("Despite an indefinite background reconnection, got error: %v", err)
}
_ = exp.Shutdown(context.Background())
_ = exp.Shutdown(ctx)
}
func TestNewExporter_withAddress(t *testing.T) {
@ -432,11 +438,12 @@ func TestNewExporter_withAddress(t *testing.T) {
otlp.WithReconnectionPeriod(50*time.Millisecond),
otlp.WithAddress(mc.address))
ctx := context.Background()
defer func() {
_ = exp.Shutdown(context.Background())
_ = exp.Shutdown(ctx)
}()
if err := exp.Start(); err != nil {
if err := exp.Start(ctx); err != nil {
t.Fatalf("Unexpected Start error: %v", err)
}
}
@ -447,16 +454,17 @@ func TestNewExporter_withHeaders(t *testing.T) {
_ = mc.stop()
}()
exp, _ := otlp.NewExporter(
ctx := context.Background()
exp, _ := otlp.NewExporter(ctx,
otlp.WithInsecure(),
otlp.WithReconnectionPeriod(50*time.Millisecond),
otlp.WithAddress(mc.address),
otlp.WithHeaders(map[string]string{"header1": "value1"}),
)
require.NoError(t, exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}))
require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "in the midst"}}))
defer func() {
_ = exp.Shutdown(context.Background())
_ = exp.Shutdown(ctx)
}()
headers := mc.getHeaders()
@ -473,14 +481,15 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
<-time.After(5 * time.Millisecond)
exp, _ := otlp.NewExporter(
ctx := context.Background()
exp, _ := otlp.NewExporter(ctx,
otlp.WithInsecure(),
otlp.WithReconnectionPeriod(50*time.Millisecond),
otlp.WithAddress(mc.address),
)
defer func() {
_ = exp.Shutdown(context.Background())
_ = exp.Shutdown(ctx)
}()
tp := sdktrace.NewTracerProvider(
@ -492,7 +501,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() { _ = tp.Shutdown(context.Background()) }()
defer func() { _ = tp.Shutdown(ctx) }()
tr := tp.Tracer("test-tracer")
testKvs := []label.KeyValue{
@ -504,7 +513,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
label.Bool("Bool", true),
label.String("String", "test"),
}
_, span := tr.Start(context.Background(), "AlwaysSample")
_, span := tr.Start(ctx, "AlwaysSample")
span.SetAttributes(testKvs...)
span.End()
@ -520,7 +529,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
ctx, cancel := context.WithTimeout(ctx, time.Millisecond)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err)

View File

@ -23,21 +23,18 @@ import (
)
func TestExporterShutdownHonorsTimeout(t *testing.T) {
orig := closeStopCh
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer func() {
cancel()
closeStopCh = orig
}()
closeStopCh = func(stopCh chan struct{}) {
go func() {
<-ctx.Done()
close(stopCh)
}()
}
defer cancel()
e := NewUnstartedExporter()
if err := e.Start(); err != nil {
orig := e.cc.closeBackgroundConnectionDoneCh
e.cc.closeBackgroundConnectionDoneCh = func(ch chan struct{}) {
go func() {
<-ctx.Done()
orig(ch)
}()
}
if err := e.Start(ctx); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
@ -51,21 +48,18 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) {
}
func TestExporterShutdownHonorsCancel(t *testing.T) {
orig := closeStopCh
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer func() {
cancel()
closeStopCh = orig
}()
closeStopCh = func(stopCh chan struct{}) {
go func() {
<-ctx.Done()
close(stopCh)
}()
}
defer cancel()
e := NewUnstartedExporter()
if err := e.Start(); err != nil {
orig := e.cc.closeBackgroundConnectionDoneCh
e.cc.closeBackgroundConnectionDoneCh = func(ch chan struct{}) {
go func() {
<-ctx.Done()
orig(ch)
}()
}
if err := e.Start(ctx); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
@ -84,7 +78,7 @@ func TestExporterShutdownNoError(t *testing.T) {
defer cancel()
e := NewUnstartedExporter()
if err := e.Start(); err != nil {
if err := e.Start(ctx); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
@ -95,7 +89,7 @@ func TestExporterShutdownNoError(t *testing.T) {
func TestExporterShutdownManyTimes(t *testing.T) {
ctx := context.Background()
e, err := NewExporter()
e, err := NewExporter(ctx)
if err != nil {
t.Fatalf("failed to start an exporter: %v", err)
}