mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-26 03:52:03 +02:00
cbcd4b1a3d
* Remove TODO from ReadOnlySpan interface * Remove the Tracer method from the ReadOnlySpan This is not required by the specification nor the use of this interface. * Remove IsRecording from the ReadOnlySpan interface A read-only span value does not need to know if updates to it will be recorded. It by definition cannot be updated so no point in communicating if an update would be recorded. * Document the ReadOnlySpan interface * Rename messageEvent* to just event* * Move the SpanSnapshot into its own file * Update ReadOnlySpan interface with meta info methods Add the DroppedAttributes, DroppedLinks, DroppedEvents, and ChildSpanCount methods to the interface to return additional information about the span not specified by the specification, but that we are already providing. * Add SpanStub to the sdk/trace/tracetest pkg * Redefine ExportSpans of SpanExporter with ReadOnlySpan * Rename SpanSnapshot to snapshot and purge docs * Remove Snapshot method from snapshot type This method is a hold-over from previous version of the ReadOnlySpan interface is not needed. * Update CHANGELOG with changes
334 lines
8.5 KiB
Go
334 lines
8.5 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package trace // import "go.opentelemetry.io/otel/sdk/trace"
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
)
|
|
|
|
const (
|
|
DefaultMaxQueueSize = 2048
|
|
DefaultBatchTimeout = 5000 * time.Millisecond
|
|
DefaultExportTimeout = 30000 * time.Millisecond
|
|
DefaultMaxExportBatchSize = 512
|
|
)
|
|
|
|
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
|
|
|
|
type BatchSpanProcessorOptions struct {
|
|
// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
|
|
// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
|
|
// The default value of MaxQueueSize is 2048.
|
|
MaxQueueSize int
|
|
|
|
// BatchTimeout is the maximum duration for constructing a batch. Processor
|
|
// forcefully sends available spans when timeout is reached.
|
|
// The default value of BatchTimeout is 5000 msec.
|
|
BatchTimeout time.Duration
|
|
|
|
// ExportTimeout specifies the maximum duration for exporting spans. If the timeout
|
|
// is reached, the export will be cancelled.
|
|
// The default value of ExportTimeout is 30000 msec.
|
|
ExportTimeout time.Duration
|
|
|
|
// MaxExportBatchSize is the maximum number of spans to process in a single batch.
|
|
// If there are more than one batch worth of spans then it processes multiple batches
|
|
// of spans one batch after the other without any delay.
|
|
// The default value of MaxExportBatchSize is 512.
|
|
MaxExportBatchSize int
|
|
|
|
// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
|
|
// AND if BlockOnQueueFull is set to true.
|
|
// Blocking option should be used carefully as it can severely affect the performance of an
|
|
// application.
|
|
BlockOnQueueFull bool
|
|
}
|
|
|
|
// batchSpanProcessor is a SpanProcessor that batches asynchronously-received
|
|
// spans and sends them to a trace.Exporter when complete.
|
|
type batchSpanProcessor struct {
|
|
e SpanExporter
|
|
o BatchSpanProcessorOptions
|
|
|
|
queue chan ReadOnlySpan
|
|
dropped uint32
|
|
|
|
batch []ReadOnlySpan
|
|
batchMutex sync.Mutex
|
|
timer *time.Timer
|
|
stopWait sync.WaitGroup
|
|
stopOnce sync.Once
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
var _ SpanProcessor = (*batchSpanProcessor)(nil)
|
|
|
|
// NewBatchSpanProcessor creates a new SpanProcessor that will send completed
|
|
// span batches to the exporter with the supplied options.
|
|
//
|
|
// If the exporter is nil, the span processor will preform no action.
|
|
func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
|
|
o := BatchSpanProcessorOptions{
|
|
BatchTimeout: DefaultBatchTimeout,
|
|
ExportTimeout: DefaultExportTimeout,
|
|
MaxQueueSize: DefaultMaxQueueSize,
|
|
MaxExportBatchSize: DefaultMaxExportBatchSize,
|
|
}
|
|
for _, opt := range options {
|
|
opt(&o)
|
|
}
|
|
bsp := &batchSpanProcessor{
|
|
e: exporter,
|
|
o: o,
|
|
batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
|
|
timer: time.NewTimer(o.BatchTimeout),
|
|
queue: make(chan ReadOnlySpan, o.MaxQueueSize),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
bsp.stopWait.Add(1)
|
|
go func() {
|
|
defer bsp.stopWait.Done()
|
|
bsp.processQueue()
|
|
bsp.drainQueue()
|
|
}()
|
|
|
|
return bsp
|
|
}
|
|
|
|
// OnStart method does nothing.
|
|
func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
|
|
|
|
// OnEnd method enqueues a ReadOnlySpan for later processing.
|
|
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
|
|
// Do not enqueue spans if we are just going to drop them.
|
|
if bsp.e == nil {
|
|
return
|
|
}
|
|
bsp.enqueue(s)
|
|
}
|
|
|
|
// Shutdown flushes the queue and waits until all spans are processed.
|
|
// It only executes once. Subsequent call does nothing.
|
|
func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
|
|
var err error
|
|
bsp.stopOnce.Do(func() {
|
|
wait := make(chan struct{})
|
|
go func() {
|
|
close(bsp.stopCh)
|
|
bsp.stopWait.Wait()
|
|
if bsp.e != nil {
|
|
if err := bsp.e.Shutdown(ctx); err != nil {
|
|
otel.Handle(err)
|
|
}
|
|
}
|
|
close(wait)
|
|
}()
|
|
// Wait until the wait group is done or the context is cancelled
|
|
select {
|
|
case <-wait:
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
}
|
|
})
|
|
return err
|
|
}
|
|
|
|
// ForceFlush exports all ended spans that have not yet been exported.
|
|
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
|
|
var err error
|
|
if bsp.e != nil {
|
|
wait := make(chan error)
|
|
go func() {
|
|
wait <- bsp.exportSpans(ctx)
|
|
close(wait)
|
|
}()
|
|
// Wait until the export is finished or the context is cancelled/timed out
|
|
select {
|
|
case err = <-wait:
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
|
|
return func(o *BatchSpanProcessorOptions) {
|
|
o.MaxQueueSize = size
|
|
}
|
|
}
|
|
|
|
func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
|
|
return func(o *BatchSpanProcessorOptions) {
|
|
o.MaxExportBatchSize = size
|
|
}
|
|
}
|
|
|
|
func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
|
|
return func(o *BatchSpanProcessorOptions) {
|
|
o.BatchTimeout = delay
|
|
}
|
|
}
|
|
|
|
func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
|
|
return func(o *BatchSpanProcessorOptions) {
|
|
o.ExportTimeout = timeout
|
|
}
|
|
}
|
|
|
|
func WithBlocking() BatchSpanProcessorOption {
|
|
return func(o *BatchSpanProcessorOptions) {
|
|
o.BlockOnQueueFull = true
|
|
}
|
|
}
|
|
|
|
// exportSpans is a subroutine of processing and draining the queue.
|
|
func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
|
|
bsp.timer.Reset(bsp.o.BatchTimeout)
|
|
|
|
bsp.batchMutex.Lock()
|
|
defer bsp.batchMutex.Unlock()
|
|
|
|
if bsp.o.ExportTimeout > 0 {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
|
|
defer cancel()
|
|
}
|
|
|
|
if l := len(bsp.batch); l > 0 {
|
|
err := bsp.e.ExportSpans(ctx, bsp.batch)
|
|
|
|
// A new batch is always created after exporting, even if the batch failed to be exported.
|
|
//
|
|
// It is up to the exporter to implement any type of retry logic if a batch is failing
|
|
// to be exported, since it is specific to the protocol and backend being sent to.
|
|
bsp.batch = bsp.batch[:0]
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processQueue removes spans from the `queue` channel until processor
|
|
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
|
|
// waiting up to BatchTimeout to form a batch.
|
|
func (bsp *batchSpanProcessor) processQueue() {
|
|
defer bsp.timer.Stop()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
for {
|
|
select {
|
|
case <-bsp.stopCh:
|
|
return
|
|
case <-bsp.timer.C:
|
|
if err := bsp.exportSpans(ctx); err != nil {
|
|
otel.Handle(err)
|
|
}
|
|
case sd := <-bsp.queue:
|
|
bsp.batchMutex.Lock()
|
|
bsp.batch = append(bsp.batch, sd)
|
|
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
|
|
bsp.batchMutex.Unlock()
|
|
if shouldExport {
|
|
if !bsp.timer.Stop() {
|
|
<-bsp.timer.C
|
|
}
|
|
if err := bsp.exportSpans(ctx); err != nil {
|
|
otel.Handle(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// drainQueue awaits the any caller that had added to bsp.stopWait
|
|
// to finish the enqueue, then exports the final batch.
|
|
func (bsp *batchSpanProcessor) drainQueue() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
for {
|
|
select {
|
|
case sd := <-bsp.queue:
|
|
if sd == nil {
|
|
if err := bsp.exportSpans(ctx); err != nil {
|
|
otel.Handle(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
bsp.batchMutex.Lock()
|
|
bsp.batch = append(bsp.batch, sd)
|
|
shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
|
|
bsp.batchMutex.Unlock()
|
|
|
|
if shouldExport {
|
|
if err := bsp.exportSpans(ctx); err != nil {
|
|
otel.Handle(err)
|
|
}
|
|
}
|
|
default:
|
|
close(bsp.queue)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
|
|
if !sd.SpanContext().IsSampled() {
|
|
return
|
|
}
|
|
|
|
// This ensures the bsp.queue<- below does not panic as the
|
|
// processor shuts down.
|
|
defer func() {
|
|
x := recover()
|
|
switch err := x.(type) {
|
|
case nil:
|
|
return
|
|
case runtime.Error:
|
|
if err.Error() == "send on closed channel" {
|
|
return
|
|
}
|
|
}
|
|
panic(x)
|
|
}()
|
|
|
|
select {
|
|
case <-bsp.stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
if bsp.o.BlockOnQueueFull {
|
|
bsp.queue <- sd
|
|
return
|
|
}
|
|
|
|
select {
|
|
case bsp.queue <- sd:
|
|
default:
|
|
atomic.AddUint32(&bsp.dropped, 1)
|
|
}
|
|
}
|