mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-04 09:43:23 +02:00
Remove locking from Jaeger exporter shutdown/export (#1807)
This commit is contained in:
parent
4f9fec29da
commit
2de86f23c3
@ -19,7 +19,6 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/api/support/bundler"
|
||||
|
||||
@ -115,8 +114,10 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
|
||||
return nil, fmt.Errorf("failed to get service name from default resource")
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
e := &Exporter{
|
||||
uploader: uploader,
|
||||
stopCh: stopCh,
|
||||
defaultServiceName: defaultServiceName,
|
||||
}
|
||||
bundler := bundler.NewBundler((*sdktrace.SpanSnapshot)(nil), func(bundle interface{}) {
|
||||
@ -180,8 +181,7 @@ type Exporter struct {
|
||||
bundler *bundler.Bundler
|
||||
uploader batchUploader
|
||||
|
||||
stoppedMu sync.RWMutex
|
||||
stopped bool
|
||||
stopCh chan struct{}
|
||||
|
||||
defaultServiceName string
|
||||
}
|
||||
@ -190,13 +190,27 @@ var _ sdktrace.SpanExporter = (*Exporter)(nil)
|
||||
|
||||
// ExportSpans exports SpanSnapshots to Jaeger.
|
||||
func (e *Exporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot) error {
|
||||
e.stoppedMu.RLock()
|
||||
stopped := e.stopped
|
||||
e.stoppedMu.RUnlock()
|
||||
if stopped {
|
||||
// Return fast if context is already canceled or Exporter shutdown.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-e.stopCh:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Cancel export if Exporter is shutdown.
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
go func(ctx context.Context, cancel context.CancelFunc) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-e.stopCh:
|
||||
cancel()
|
||||
}
|
||||
}(ctx, cancel)
|
||||
|
||||
for _, span := range ss {
|
||||
// TODO(jbd): Handle oversized bundlers.
|
||||
err := e.bundler.AddWait(ctx, span, 1)
|
||||
@ -220,9 +234,8 @@ var flush = func(e *Exporter) {
|
||||
|
||||
// Shutdown stops the exporter flushing any pending exports.
|
||||
func (e *Exporter) Shutdown(ctx context.Context) error {
|
||||
e.stoppedMu.Lock()
|
||||
e.stopped = true
|
||||
e.stoppedMu.Unlock()
|
||||
// Stop any active and subsequent exports.
|
||||
close(e.stopCh)
|
||||
|
||||
done := make(chan struct{}, 1)
|
||||
// Shadow so if the goroutine is leaked in testing it doesn't cause a race
|
||||
@ -408,6 +421,12 @@ func getBoolTag(k string, b bool) *gen.Tag {
|
||||
//
|
||||
// This is useful if your program is ending and you do not want to lose recent spans.
|
||||
func (e *Exporter) Flush() {
|
||||
// Return fast if Exporter shutdown.
|
||||
select {
|
||||
case <-e.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
flush(e)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user