1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-17 14:01:52 +02:00

Some cleanups in otlp exporter ()

* Drop WorkerCount option

This is not a good option - the user isn't likely to know how many
worker goroutines is optimal. This should be something that an
exporter should figure out itself. The second problem with the option
is that it is specific to the metric transformation from SDK export
format into protobuf. When the exporter starts supporting other
protocols (HTTP/JSON for example), this option may be of no use. So
the option should rather belong to the protocol, not to the
exporter. Currently both mean the same, but later they will be
separated, and this option breaks the separation.

* Make stop channel a typical signalling channel

Signalling channels are idiomatically defined as chan struct{}, so
let's make it so, to avoid confusion about the meaning of the bool
type.

* Close a race when grpc connection is closed multiple times

If several goroutines call Shutdown at the same time, then the
following scenario is possible:

goroutine A locks a mutex, reads a started member, unlocks the mutex
and gets preempted

goroutine B locks a mutex, reads a started member, unlocks the mutex
and gets preempted

goroutine A does not return early in the "if !started" conditional and
continues to close the connection and execute the rest of the function
(where it finally sets the started member to false), gets preempted

goroutine B also does not return early, because it got a copy of
started before goroutine A set it to false, so it tries to close the
connection again.

* Update changelog
This commit is contained in:
Krzesimir Nowak 2020-11-20 03:03:25 +01:00 committed by GitHub
parent 5b5b4ab1ea
commit 6eb68013b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 46 deletions

@ -59,6 +59,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `AddEventWithTimestamp` method on the `Span` interface in `go.opentelemetry.io/otel` is removed due to its redundancy. - The `AddEventWithTimestamp` method on the `Span` interface in `go.opentelemetry.io/otel` is removed due to its redundancy.
It is replaced by using the `AddEvent` method with a `WithTimestamp` option. (#1254) It is replaced by using the `AddEvent` method with a `WithTimestamp` option. (#1254)
- Structs `MockSpan` and `MockTracer` are removed from `go.opentelemetry.io/otel/oteltest`. `Tracer` and `Span` from the same module should be used in their place instead. (#1306) - Structs `MockSpan` and `MockTracer` are removed from `go.opentelemetry.io/otel/oteltest`. `Tracer` and `Span` from the same module should be used in their place instead. (#1306)
- `WorkerCount` option is removed from `go.opentelemetry.io/otel/exporters/otlp`. (#1350)
### Fixed ### Fixed

@ -30,10 +30,6 @@ const (
// DefaultCollectorHost is the host address the Exporter will attempt // DefaultCollectorHost is the host address the Exporter will attempt
// connect to if no collector address is provided. // connect to if no collector address is provided.
DefaultCollectorHost string = "localhost" DefaultCollectorHost string = "localhost"
// DefaultNumWorkers is the number of goroutines the Exporter will use when
// processing telemetry.
DefaultNumWorkers uint = 1
// DefaultGRPCServiceConfig is the gRPC service config used if none is // DefaultGRPCServiceConfig is the gRPC service config used if none is
// provided by the user. // provided by the user.
// //
@ -84,20 +80,9 @@ type config struct {
grpcDialOptions []grpc.DialOption grpcDialOptions []grpc.DialOption
headers map[string]string headers map[string]string
clientCredentials credentials.TransportCredentials clientCredentials credentials.TransportCredentials
numWorkers uint
exportKindSelector metricsdk.ExportKindSelector exportKindSelector metricsdk.ExportKindSelector
} }
// WorkerCount sets the number of Goroutines to use when processing telemetry.
func WorkerCount(n uint) ExporterOption {
if n == 0 {
n = DefaultNumWorkers
}
return func(cfg *config) {
cfg.numWorkers = n
}
}
// WithInsecure disables client transport security for the exporter's gRPC connection // WithInsecure disables client transport security for the exporter's gRPC connection
// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure // just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure
// does. Note, by default, client security is required unless WithInsecure is used. // does. Note, by default, client security is required unless WithInsecure is used.

@ -51,7 +51,8 @@ type Exporter struct {
lastConnectErrPtr unsafe.Pointer lastConnectErrPtr unsafe.Pointer
startOnce sync.Once startOnce sync.Once
stopCh chan bool stopOnce sync.Once
stopCh chan struct{}
disconnectedCh chan bool disconnectedCh chan bool
backgroundConnectionDoneCh chan bool backgroundConnectionDoneCh chan bool
@ -67,7 +68,6 @@ var _ metricsdk.Exporter = (*Exporter)(nil)
// any ExporterOptions provided. // any ExporterOptions provided.
func newConfig(opts ...ExporterOption) config { func newConfig(opts ...ExporterOption) config {
cfg := config{ cfg := config{
numWorkers: DefaultNumWorkers,
grpcServiceConfig: DefaultGRPCServiceConfig, grpcServiceConfig: DefaultGRPCServiceConfig,
// Note: the default ExportKindSelector is specified // Note: the default ExportKindSelector is specified
@ -119,7 +119,7 @@ func (e *Exporter) Start() error {
e.mu.Lock() e.mu.Lock()
e.started = true e.started = true
e.disconnectedCh = make(chan bool, 1) e.disconnectedCh = make(chan bool, 1)
e.stopCh = make(chan bool) e.stopCh = make(chan struct{})
e.backgroundConnectionDoneCh = make(chan bool) e.backgroundConnectionDoneCh = make(chan bool)
e.mu.Unlock() e.mu.Unlock()
@ -206,7 +206,7 @@ func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
} }
// closeStopCh is used to wrap the exporters stopCh channel closing for testing. // closeStopCh is used to wrap the exporters stopCh channel closing for testing.
var closeStopCh = func(stopCh chan bool) { var closeStopCh = func(stopCh chan struct{}) {
close(stopCh) close(stopCh)
} }
@ -223,23 +223,26 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
} }
var err error var err error
if cc != nil {
// Clean things up before checking this error.
err = cc.Close()
}
// At this point we can change the state variable started e.stopOnce.Do(func() {
e.mu.Lock() if cc != nil {
e.started = false // Clean things up before checking this error.
e.mu.Unlock() err = cc.Close()
closeStopCh(e.stopCh) }
// Ensure that the backgroundConnector returns // At this point we can change the state variable started
select { e.mu.Lock()
case <-e.backgroundConnectionDoneCh: e.started = false
case <-ctx.Done(): e.mu.Unlock()
return ctx.Err() closeStopCh(e.stopCh)
}
// Ensure that the backgroundConnector returns
select {
case <-e.backgroundConnectionDoneCh:
case <-ctx.Done():
err = ctx.Err()
}
})
return err return err
} }
@ -259,7 +262,10 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
} }
}(ctx, cancel) }(ctx, cancel)
rms, err := transform.CheckpointSet(ctx, e, cps, e.c.numWorkers) // Hardcode the number of worker goroutines to 1. We later will
// need to see if there's a way to adjust that number for longer
// running operations.
rms, err := transform.CheckpointSet(ctx, e, cps, 1)
if err != nil { if err != nil {
return err return err
} }

@ -767,15 +767,7 @@ func TestStatelessExportKind(t *testing.T) {
// What works single-threaded should work multi-threaded // What works single-threaded should work multi-threaded
func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
t.Run("1 goroutine", func(t *testing.T) { exp := NewUnstartedExporter(opts...)
runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(1))...), rs, expected)
})
t.Run("20 goroutines", func(t *testing.T) {
runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(20))...), rs, expected)
})
}
func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []metricpb.ResourceMetrics) {
msc := &metricsServiceClientStub{} msc := &metricsServiceClientStub{}
exp.metricExporter = msc exp.metricExporter = msc
exp.started = true exp.started = true

@ -17,6 +17,7 @@ package otlp
import ( import (
"context" "context"
"errors" "errors"
"sync"
"testing" "testing"
"time" "time"
) )
@ -28,7 +29,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) {
cancel() cancel()
closeStopCh = orig closeStopCh = orig
}() }()
closeStopCh = func(stopCh chan bool) { closeStopCh = func(stopCh chan struct{}) {
go func() { go func() {
<-ctx.Done() <-ctx.Done()
close(stopCh) close(stopCh)
@ -56,7 +57,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) {
cancel() cancel()
closeStopCh = orig closeStopCh = orig
}() }()
closeStopCh = func(stopCh chan bool) { closeStopCh = func(stopCh chan struct{}) {
go func() { go func() {
<-ctx.Done() <-ctx.Done()
close(stopCh) close(stopCh)
@ -91,3 +92,30 @@ func TestExporterShutdownNoError(t *testing.T) {
t.Errorf("shutdown errored: expected nil, got %v", err) t.Errorf("shutdown errored: expected nil, got %v", err)
} }
} }
func TestExporterShutdownManyTimes(t *testing.T) {
ctx := context.Background()
e, err := NewExporter()
if err != nil {
t.Fatalf("failed to start an exporter: %v", err)
}
ch := make(chan struct{})
wg := sync.WaitGroup{}
const num int = 20
wg.Add(num)
errs := make([]error, num)
for i := 0; i < num; i++ {
go func(idx int) {
defer wg.Done()
<-ch
errs[idx] = e.Shutdown(ctx)
}(i)
}
close(ch)
wg.Wait()
for _, err := range errs {
if err != nil {
t.Fatalf("failed to shutdown exporter: %v", err)
}
}
}