1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-10 00:29:12 +02:00
opentelemetry-go/sdk/trace/batch_span_processor.go
Tyler Yahn 422188a85f
Correct SDK trace Exporter interface (#1078)
* Update trace export interface

Move to conforming to the specification.

* Update documentation in export trace

* Update sdk trace provider to support new trace exporter

* Update SpanProcessors

Support the Provider changes and new trace exporter.

* Update the SDK to support the changes

* Update trace Provider to not return an error

* Update sdk with new Provider return

Also fix the testExporter ExportSpans method

* Update exporters with changes

* Update examples with changes

* Update Changelog

* Move error handling to end of shutdown

* Update exporter interface

Rename to SpanExporter to match specification. Add an error return value
to the Shutdown method based on feedback. Propagate these changes.

Remove the Stop method from the OTLP exporter to avoid confusion and
redundancy.

* Add test to check OTLP Shutdown honors context

* Add Jaeger exporter test for shutdown

* Fix race in Jaeger test

* Unify shutdown behavior and testing

* Update sdk/trace/simple_span_processor.go

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
2020-09-09 10:19:03 -07:00

252 lines
6.4 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/api/global"
export "go.opentelemetry.io/otel/sdk/export/trace"
)
const (
DefaultMaxQueueSize = 2048
DefaultBatchTimeout = 5000 * time.Millisecond
DefaultMaxExportBatchSize = 512
)
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
type BatchSpanProcessorOptions struct {
// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
// The default value of MaxQueueSize is 2048.
MaxQueueSize int
// BatchTimeout is the maximum duration for constructing a batch. Processor
// forcefully sends available spans when timeout is reached.
// The default value of BatchTimeout is 5000 msec.
BatchTimeout time.Duration
// MaxExportBatchSize is the maximum number of spans to process in a single batch.
// If there are more than one batch worth of spans then it processes multiple batches
// of spans one batch after the other without any delay.
// The default value of MaxExportBatchSize is 512.
MaxExportBatchSize int
// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
// AND if BlockOnQueueFull is set to true.
// Blocking option should be used carefully as it can severely affect the performance of an
// application.
BlockOnQueueFull bool
}
// BatchSpanProcessor is a SpanProcessor that batches asynchronously received
// SpanData and sends it to a trace.Exporter when complete.
type BatchSpanProcessor struct {
e export.SpanExporter
o BatchSpanProcessorOptions
queue chan *export.SpanData
dropped uint32
batch []*export.SpanData
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}
var _ SpanProcessor = (*BatchSpanProcessor)(nil)
// NewBatchSpanProcessor creates a new BatchSpanProcessor that will send
// SpanData batches to the exporters with the supplied options.
//
// The returned BatchSpanProcessor needs to be registered with the SDK using
// the RegisterSpanProcessor method for it to process spans.
//
// If the exporter is nil, the span processor will preform no action.
func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanProcessorOption) *BatchSpanProcessor {
o := BatchSpanProcessorOptions{
BatchTimeout: DefaultBatchTimeout,
MaxQueueSize: DefaultMaxQueueSize,
MaxExportBatchSize: DefaultMaxExportBatchSize,
}
for _, opt := range options {
opt(&o)
}
bsp := &BatchSpanProcessor{
e: exporter,
o: o,
batch: make([]*export.SpanData, 0, o.MaxExportBatchSize),
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan *export.SpanData, o.MaxQueueSize),
stopCh: make(chan struct{}),
}
bsp.stopWait.Add(1)
go func() {
defer bsp.stopWait.Done()
bsp.processQueue()
bsp.drainQueue()
}()
return bsp
}
// OnStart method does nothing.
func (bsp *BatchSpanProcessor) OnStart(sd *export.SpanData) {}
// OnEnd method enqueues export.SpanData for later processing.
func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) {
// Do not enqueue spans if we are just going to drop them.
if bsp.e == nil {
return
}
bsp.enqueue(sd)
}
// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *BatchSpanProcessor) Shutdown() {
bsp.stopOnce.Do(func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
})
}
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxQueueSize = size
}
}
func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxExportBatchSize = size
}
}
func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.BatchTimeout = delay
}
}
func WithBlocking() BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.BlockOnQueueFull = true
}
}
// exportSpans is a subroutine of processing and draining the queue.
func (bsp *BatchSpanProcessor) exportSpans() {
bsp.timer.Reset(bsp.o.BatchTimeout)
if len(bsp.batch) > 0 {
if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil {
global.Handle(err)
}
bsp.batch = bsp.batch[:0]
}
}
// processQueue removes spans from the `queue` channel until processor
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// waiting up to BatchTimeout to form a batch.
func (bsp *BatchSpanProcessor) processQueue() {
defer bsp.timer.Stop()
for {
select {
case <-bsp.stopCh:
return
case <-bsp.timer.C:
bsp.exportSpans()
case sd := <-bsp.queue:
bsp.batch = append(bsp.batch, sd)
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
if !bsp.timer.Stop() {
<-bsp.timer.C
}
bsp.exportSpans()
}
}
}
}
// drainQueue awaits the any caller that had added to bsp.stopWait
// to finish the enqueue, then exports the final batch.
func (bsp *BatchSpanProcessor) drainQueue() {
for {
select {
case sd := <-bsp.queue:
if sd == nil {
bsp.exportSpans()
return
}
bsp.batch = append(bsp.batch, sd)
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
bsp.exportSpans()
}
default:
close(bsp.queue)
}
}
}
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
if !sd.SpanContext.IsSampled() {
return
}
// This ensures the bsp.queue<- below does not panic as the
// processor shuts down.
defer func() {
x := recover()
switch err := x.(type) {
case nil:
return
case runtime.Error:
if err.Error() == "send on closed channel" {
return
}
}
panic(x)
}()
select {
case <-bsp.stopCh:
return
default:
}
if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
return
}
select {
case bsp.queue <- sd:
default:
atomic.AddUint32(&bsp.dropped, 1)
}
}