1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-07 13:31:42 +02:00

Batched span processor (#154)

* add batch span processor.

* add blocking support.

* use With* function for options.
- also changed how Shutdown is handled.

* block Shutdown until queue is flushed.

* fix comment.
This commit is contained in:
rghetia 2019-10-01 13:50:51 -07:00 committed by GitHub
parent a936b8fb00
commit 3d5b2fa328
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 433 additions and 5 deletions

View File

@ -29,8 +29,7 @@ func main() {
trace.Register()
ctx := context.Background()
// Register the Jaeger exporter to be able to retrieve
// the collected spans.
// Create Jaeger Exporter
exporter, err := jaeger.NewExporter(jaeger.Options{
CollectorEndpoint: "http://localhost:14268/api/traces",
Process: jaeger.Process{
@ -40,7 +39,10 @@ func main() {
if err != nil {
log.Fatal(err)
}
trace.RegisterExporter(exporter)
// Wrap exporter with SimpleSpanProcessor and register the processor.
ssp := trace.NewSimpleSpanProcessor(exporter)
trace.RegisterSpanProcessor(ssp)
// For demoing purposes, always sample. In a production application, you should
// configure this to a trace.ProbabilitySampler set at the desired

View File

@ -0,0 +1,209 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
import (
"context"
"errors"
"sync"
"time"
)
const (
defaultMaxQueueSize = 2048
defaultScheduledDelayMillis = time.Duration(5000 * time.Millisecond)
defaultMaxExportBatchSize = 512
)
var (
errNilExporter = errors.New("exporter is nil")
)
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
type BatchSpanProcessorOptions struct {
// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
// The default value of MaxQueueSize is 2048.
MaxQueueSize int
// ScheduledDelayMillis is the delay interval in milliseconds between two consecutive
// processing of batches.
// The default value of ScheduledDelayMillis is 5000 msec.
ScheduledDelayMillis time.Duration
// MaxExportBatchSize is the maximum number of spans to process in a single batch.
// If there are more than one batch worth of spans then it processes multiple batches
// of spans one batch after the other without any delay.
// The default value of MaxExportBatchSize is 512.
MaxExportBatchSize int
// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
// AND if BlockOnQueueFull is set to true.
// Blocking option should be used carefully as it can severely affect the performance of an
// application.
BlockOnQueueFull bool
}
// BatchSpanProcessor implements SpanProcessor interfaces. It is used by
// exporters to receive SpanData asynchronously.
// Use BatchSpanProcessorOptions to change the behavior of the processor.
type BatchSpanProcessor struct {
exporter BatchExporter
o BatchSpanProcessorOptions
queue chan *SpanData
dropped uint32
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}
var _ SpanProcessor = (*BatchSpanProcessor)(nil)
// NewBatchSpanProcessor creates a new instance of BatchSpanProcessor
// for a given exporter. It returns an error if exporter is nil.
// The newly created BatchSpanProcessor should then be registered with sdk
// using RegisterSpanProcessor.
func NewBatchSpanProcessor(exporter BatchExporter, opts ...BatchSpanProcessorOption) (*BatchSpanProcessor, error) {
if exporter == nil {
return nil, errNilExporter
}
o := BatchSpanProcessorOptions{
ScheduledDelayMillis: defaultScheduledDelayMillis,
MaxQueueSize: defaultMaxQueueSize,
MaxExportBatchSize: defaultMaxExportBatchSize,
}
for _, opt := range opts {
opt(&o)
}
bsp := &BatchSpanProcessor{
exporter: exporter,
o: o,
}
bsp.queue = make(chan *SpanData, bsp.o.MaxQueueSize)
bsp.stopCh = make(chan struct{})
//Start timer to export metrics
ticker := time.NewTicker(bsp.o.ScheduledDelayMillis)
go func(ctx context.Context) {
defer ticker.Stop()
bsp.stopWait.Add(1)
for {
select {
case <-bsp.stopCh:
bsp.processQueue()
close(bsp.queue)
bsp.stopWait.Done()
return
case <-ticker.C:
bsp.processQueue()
}
}
}(context.Background())
return bsp, nil
}
// OnStart method does nothing.
func (bsp *BatchSpanProcessor) OnStart(sd *SpanData) {
}
// OnEnd method enqueues SpanData for later processing.
func (bsp *BatchSpanProcessor) OnEnd(sd *SpanData) {
bsp.enqueue(sd)
}
// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *BatchSpanProcessor) Shutdown() {
bsp.stopOnce.Do(func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
})
}
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxQueueSize = size
}
}
func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxExportBatchSize = size
}
}
func WithScheduleDelayMillis(delay time.Duration) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.ScheduledDelayMillis = delay
}
}
func WithBlocking() BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.BlockOnQueueFull = true
}
}
func (bsp *BatchSpanProcessor) processQueue() {
batch := make([]*SpanData, 0, bsp.o.MaxExportBatchSize)
for {
var sd *SpanData
var ok bool
select {
case sd = <-bsp.queue:
if sd != nil {
batch = append(batch, sd)
}
ok = true
default:
ok = false
}
if ok {
if len(batch) >= bsp.o.MaxExportBatchSize {
bsp.exporter.ExportSpans(batch)
batch = batch[:0]
}
} else {
if len(batch) > 0 {
bsp.exporter.ExportSpans(batch)
}
break
}
}
}
func (bsp *BatchSpanProcessor) enqueue(sd *SpanData) {
if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
} else {
var ok bool
select {
case bsp.queue <- sd:
ok = true
default:
ok = false
}
if !ok {
bsp.dropped++
}
}
}

View File

@ -0,0 +1,204 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace_test
import (
"context"
"sync"
"testing"
"time"
"go.opentelemetry.io/api/core"
apitrace "go.opentelemetry.io/api/trace"
sdktrace "go.opentelemetry.io/sdk/trace"
)
type testBatchExporter struct {
mu sync.Mutex
spans []*sdktrace.SpanData
sizes []int
batchCount int
}
func (t *testBatchExporter) ExportSpans(sds []*sdktrace.SpanData) {
t.mu.Lock()
defer t.mu.Unlock()
t.spans = append(t.spans, sds...)
t.sizes = append(t.sizes, len(sds))
t.batchCount++
}
func (t *testBatchExporter) len() int {
t.mu.Lock()
defer t.mu.Unlock()
return len(t.spans)
}
func (t *testBatchExporter) getBatchCount() int {
t.mu.Lock()
defer t.mu.Unlock()
return t.batchCount
}
func (t *testBatchExporter) get(idx int) *sdktrace.SpanData {
t.mu.Lock()
defer t.mu.Unlock()
return t.spans[idx]
}
var _ sdktrace.BatchExporter = (*testBatchExporter)(nil)
func init() {
sdktrace.Register()
sdktrace.ApplyConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()})
}
func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
_, err := sdktrace.NewBatchSpanProcessor(nil)
if err == nil {
t.Errorf("Expected error while creating processor with nil exporter")
}
}
type testOption struct {
name string
o []sdktrace.BatchSpanProcessorOption
wantNumSpans int
wantBatchCount int
genNumSpans int
waitTime time.Duration
}
func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
schDelay := time.Duration(200 * time.Millisecond)
waitTime := schDelay + time.Duration(100*time.Millisecond)
options := []testOption{
{
name: "default BatchSpanProcessorOptions",
wantNumSpans: 2048,
wantBatchCount: 4,
genNumSpans: 2053,
waitTime: time.Duration(5100 * time.Millisecond),
},
{
name: "non-default ScheduledDelayMillis",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithScheduleDelayMillis(schDelay),
},
wantNumSpans: 2048,
wantBatchCount: 4,
genNumSpans: 2053,
waitTime: waitTime,
},
{
name: "non-default MaxQueueSize and ScheduledDelayMillis",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithScheduleDelayMillis(schDelay),
sdktrace.WithMaxQueueSize(200),
},
wantNumSpans: 200,
wantBatchCount: 1,
genNumSpans: 205,
waitTime: waitTime,
},
{
name: "non-default MaxQueueSize, ScheduledDelayMillis and MaxExportBatchSize",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithScheduleDelayMillis(schDelay),
sdktrace.WithMaxQueueSize(205),
sdktrace.WithMaxExportBatchSize(20),
},
wantNumSpans: 205,
wantBatchCount: 11,
genNumSpans: 210,
waitTime: waitTime,
},
{
name: "blocking option",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithScheduleDelayMillis(schDelay),
sdktrace.WithMaxQueueSize(200),
sdktrace.WithMaxExportBatchSize(20),
sdktrace.WithBlocking(),
},
wantNumSpans: 205,
wantBatchCount: 11,
genNumSpans: 205,
waitTime: waitTime,
},
}
for _, option := range options {
te := testBatchExporter{}
ssp := createAndRegisterBatchSP(t, option, &te)
if ssp == nil {
t.Errorf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
sdktrace.RegisterSpanProcessor(ssp)
generateSpan(t, option)
time.Sleep(option.waitTime)
gotNumOfSpans := te.len()
if option.wantNumSpans != gotNumOfSpans {
t.Errorf("%s: number of exported span: got %+v, want %+v\n", option.name, gotNumOfSpans, option.wantNumSpans)
}
gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
t.Errorf("%s: number batches: got %+v, want >= %+v\n", option.name, gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
// Check first Span is reported. Most recent one is dropped.
sc := getSpanContext()
wantTraceID := sc.TraceID
wantTraceID.High = 1
gotTraceID := te.get(0).SpanContext.TraceID
if wantTraceID != gotTraceID {
t.Errorf("%s: first exported span: got %+v, want %+v\n", option.name, gotTraceID, wantTraceID)
}
sdktrace.UnregisterSpanProcessor(ssp)
}
}
func createAndRegisterBatchSP(t *testing.T, option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor {
ssp, err := sdktrace.NewBatchSpanProcessor(te, option.o...)
if ssp == nil {
t.Errorf("%s: Error creating new instance of BatchSpanProcessor, error: %v\n", option.name, err)
}
sdktrace.RegisterSpanProcessor(ssp)
return ssp
}
func generateSpan(t *testing.T, option testOption) {
sc := getSpanContext()
for i := 0; i < option.genNumSpans; i++ {
sc.TraceID.High = uint64(i + 1)
_, span := apitrace.GlobalTracer().Start(context.Background(), option.name, apitrace.ChildOf(sc))
span.End()
}
}
func getSpanContext() core.SpanContext {
tid := core.TraceID{High: 0x0102030405060708, Low: 0x0102040810203040}
sid := uint64(0x0102040810203040)
return core.SpanContext{
TraceID: tid,
SpanID: sid,
TraceFlags: 0x1,
}
}

View File

@ -25,6 +25,16 @@ import (
apitrace "go.opentelemetry.io/api/trace"
)
// BatchExporter is a type for functions that receive sampled trace spans.
//
// The ExportSpans method is called asynchronously. However BatchExporter should
// not take forever to process the spans.
//
// The SpanData should not be modified.
type BatchExporter interface {
ExportSpans(sds []*SpanData)
}
// Exporter is a type for functions that receive sampled trace spans.
//
// The ExportSpan method should be safe for concurrent use and should return
@ -47,6 +57,7 @@ var (
// trace spans.
//
// Binaries can register exporters, libraries shouldn't register exporters.
// TODO(rghetia) : Remove it.
func RegisterExporter(e Exporter) {
exporterMu.Lock()
defer exporterMu.Unlock()
@ -62,6 +73,7 @@ func RegisterExporter(e Exporter) {
// UnregisterExporter removes from the list of Exporters the Exporter that was
// registered with the given name.
// TODO(rghetia) : Remove it.
func UnregisterExporter(e Exporter) {
exporterMu.Lock()
defer exporterMu.Unlock()

View File

@ -60,7 +60,7 @@ func RegisterSpanProcessor(e SpanProcessor) {
// UnregisterSpanProcessor removes from the list of SpanProcessors the SpanProcessor that was
// registered with the given name.
func UnregisterSpanProcessor(e SpanProcessor) {
func UnregisterSpanProcessor(s SpanProcessor) {
mu.Lock()
defer mu.Unlock()
new := make(spanProcessorMap)
@ -69,6 +69,7 @@ func UnregisterSpanProcessor(e SpanProcessor) {
new[k] = v
}
}
delete(new, e)
delete(new, s)
spanProcessors.Store(new)
s.Shutdown()
}