You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-06-25 00:16:49 +02:00
Add ForceFlush method to TracerProvider (#1608)
* Add ForceFlush method to TracerProvider The specification requires that a TracerProvider have a ForceFlush method that can be set with a timeout, return any error to the caller, and have all the registered span processors export their spans. This updates the SpanProcessor.ForceFlush method to accept a context and return an error and plumbs this method into a new ForceFlush method of the SDK TracerProvider. Additionally, this corrects the TracerProvider Shutdown method. This method as well needs to return to the caller any failure it encounters according to the specification. This returns an error if it cannot type assert the spanProcessorStates or if shutting down a span processor results in an error. Resolves #1606 * Add changes to changelog * Apply suggestions from code review Co-authored-by: Steven E. Harris <seh@panix.com> * Cancel export context when BSP stops * Defer cancel call in BSP span processor funcs Co-authored-by: Steven E. Harris <seh@panix.com>
This commit is contained in:
@ -147,8 +147,8 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// ForceFlush exports all ended spans that have not yet been exported.
|
||||
func (bsp *batchSpanProcessor) ForceFlush() {
|
||||
bsp.exportSpans()
|
||||
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
|
||||
return bsp.exportSpans(ctx)
|
||||
}
|
||||
|
||||
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
|
||||
@ -176,18 +176,19 @@ func WithBlocking() BatchSpanProcessorOption {
|
||||
}
|
||||
|
||||
// exportSpans is a subroutine of processing and draining the queue.
|
||||
func (bsp *batchSpanProcessor) exportSpans() {
|
||||
func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
|
||||
bsp.timer.Reset(bsp.o.BatchTimeout)
|
||||
|
||||
bsp.batchMutex.Lock()
|
||||
defer bsp.batchMutex.Unlock()
|
||||
|
||||
if len(bsp.batch) > 0 {
|
||||
if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil {
|
||||
otel.Handle(err)
|
||||
if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil {
|
||||
return err
|
||||
}
|
||||
bsp.batch = bsp.batch[:0]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processQueue removes spans from the `queue` channel until processor
|
||||
@ -196,12 +197,16 @@ func (bsp *batchSpanProcessor) exportSpans() {
|
||||
func (bsp *batchSpanProcessor) processQueue() {
|
||||
defer bsp.timer.Stop()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-bsp.stopCh:
|
||||
return
|
||||
case <-bsp.timer.C:
|
||||
bsp.exportSpans()
|
||||
if err := bsp.exportSpans(ctx); err != nil {
|
||||
otel.Handle(err)
|
||||
}
|
||||
case sd := <-bsp.queue:
|
||||
bsp.batchMutex.Lock()
|
||||
bsp.batch = append(bsp.batch, sd)
|
||||
@ -211,7 +216,9 @@ func (bsp *batchSpanProcessor) processQueue() {
|
||||
if !bsp.timer.Stop() {
|
||||
<-bsp.timer.C
|
||||
}
|
||||
bsp.exportSpans()
|
||||
if err := bsp.exportSpans(ctx); err != nil {
|
||||
otel.Handle(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -220,11 +227,15 @@ func (bsp *batchSpanProcessor) processQueue() {
|
||||
// drainQueue awaits the any caller that had added to bsp.stopWait
|
||||
// to finish the enqueue, then exports the final batch.
|
||||
func (bsp *batchSpanProcessor) drainQueue() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case sd := <-bsp.queue:
|
||||
if sd == nil {
|
||||
bsp.exportSpans()
|
||||
if err := bsp.exportSpans(ctx); err != nil {
|
||||
otel.Handle(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -234,7 +245,9 @@ func (bsp *batchSpanProcessor) drainQueue() {
|
||||
bsp.batchMutex.Unlock()
|
||||
|
||||
if shouldExport {
|
||||
bsp.exportSpans()
|
||||
if err := bsp.exportSpans(ctx); err != nil {
|
||||
otel.Handle(err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
close(bsp.queue)
|
||||
|
Reference in New Issue
Block a user