1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-25 22:41:46 +02:00

Unexport the simple and batch SpanProcessors (#1638)

* Unexport the simple and batch SpanProcessors

* Update changes in changelog
This commit is contained in:
Tyler Yahn
2021-03-05 16:08:29 +00:00
committed by GitHub
parent 992837f195
commit 62e2a0f766
4 changed files with 32 additions and 30 deletions

View File

@@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Unreleased] ## [Unreleased]
### Removed
- Removed the exported `SimpleSpanProcessor` and `BatchSpanProcessor` structs.
These are now returned as a SpanProcessor interface from their respective constructors. (#1638)
## [0.18.0] - 2020-03-03 ## [0.18.0] - 2020-03-03
### Added ### Added

View File

@@ -57,9 +57,9 @@ type BatchSpanProcessorOptions struct {
BlockOnQueueFull bool BlockOnQueueFull bool
} }
// BatchSpanProcessor is a SpanProcessor that batches asynchronously-received // batchSpanProcessor is a SpanProcessor that batches asynchronously-received
// SpanSnapshots and sends them to a trace.Exporter when complete. // SpanSnapshots and sends them to a trace.Exporter when complete.
type BatchSpanProcessor struct { type batchSpanProcessor struct {
e export.SpanExporter e export.SpanExporter
o BatchSpanProcessorOptions o BatchSpanProcessorOptions
@@ -74,16 +74,13 @@ type BatchSpanProcessor struct {
stopCh chan struct{} stopCh chan struct{}
} }
var _ SpanProcessor = (*BatchSpanProcessor)(nil) var _ SpanProcessor = (*batchSpanProcessor)(nil)
// NewBatchSpanProcessor creates a new BatchSpanProcessor that will send // NewBatchSpanProcessor creates a new SpanProcessor that will send completed
// SpanSnapshot batches to the exporters with the supplied options. // span batches to the exporter with the supplied options.
//
// The returned BatchSpanProcessor needs to be registered with the SDK using
// the RegisterSpanProcessor method for it to process spans.
// //
// If the exporter is nil, the span processor will preform no action. // If the exporter is nil, the span processor will preform no action.
func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanProcessorOption) *BatchSpanProcessor { func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
o := BatchSpanProcessorOptions{ o := BatchSpanProcessorOptions{
BatchTimeout: DefaultBatchTimeout, BatchTimeout: DefaultBatchTimeout,
MaxQueueSize: DefaultMaxQueueSize, MaxQueueSize: DefaultMaxQueueSize,
@@ -92,7 +89,7 @@ func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanPro
for _, opt := range options { for _, opt := range options {
opt(&o) opt(&o)
} }
bsp := &BatchSpanProcessor{ bsp := &batchSpanProcessor{
e: exporter, e: exporter,
o: o, o: o,
batch: make([]*export.SpanSnapshot, 0, o.MaxExportBatchSize), batch: make([]*export.SpanSnapshot, 0, o.MaxExportBatchSize),
@@ -112,10 +109,10 @@ func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanPro
} }
// OnStart method does nothing. // OnStart method does nothing.
func (bsp *BatchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {} func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
// OnEnd method enqueues a ReadOnlySpan for later processing. // OnEnd method enqueues a ReadOnlySpan for later processing.
func (bsp *BatchSpanProcessor) OnEnd(s ReadOnlySpan) { func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
// Do not enqueue spans if we are just going to drop them. // Do not enqueue spans if we are just going to drop them.
if bsp.e == nil { if bsp.e == nil {
return return
@@ -125,7 +122,7 @@ func (bsp *BatchSpanProcessor) OnEnd(s ReadOnlySpan) {
// Shutdown flushes the queue and waits until all spans are processed. // Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing. // It only executes once. Subsequent call does nothing.
func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error { func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
var err error var err error
bsp.stopOnce.Do(func() { bsp.stopOnce.Do(func() {
wait := make(chan struct{}) wait := make(chan struct{})
@@ -150,7 +147,7 @@ func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error {
} }
// ForceFlush exports all ended spans that have not yet been exported. // ForceFlush exports all ended spans that have not yet been exported.
func (bsp *BatchSpanProcessor) ForceFlush() { func (bsp *batchSpanProcessor) ForceFlush() {
bsp.exportSpans() bsp.exportSpans()
} }
@@ -179,7 +176,7 @@ func WithBlocking() BatchSpanProcessorOption {
} }
// exportSpans is a subroutine of processing and draining the queue. // exportSpans is a subroutine of processing and draining the queue.
func (bsp *BatchSpanProcessor) exportSpans() { func (bsp *batchSpanProcessor) exportSpans() {
bsp.timer.Reset(bsp.o.BatchTimeout) bsp.timer.Reset(bsp.o.BatchTimeout)
bsp.batchMutex.Lock() bsp.batchMutex.Lock()
@@ -196,7 +193,7 @@ func (bsp *BatchSpanProcessor) exportSpans() {
// processQueue removes spans from the `queue` channel until processor // processQueue removes spans from the `queue` channel until processor
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize // is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// waiting up to BatchTimeout to form a batch. // waiting up to BatchTimeout to form a batch.
func (bsp *BatchSpanProcessor) processQueue() { func (bsp *batchSpanProcessor) processQueue() {
defer bsp.timer.Stop() defer bsp.timer.Stop()
for { for {
@@ -222,7 +219,7 @@ func (bsp *BatchSpanProcessor) processQueue() {
// drainQueue awaits the any caller that had added to bsp.stopWait // drainQueue awaits the any caller that had added to bsp.stopWait
// to finish the enqueue, then exports the final batch. // to finish the enqueue, then exports the final batch.
func (bsp *BatchSpanProcessor) drainQueue() { func (bsp *batchSpanProcessor) drainQueue() {
for { for {
select { select {
case sd := <-bsp.queue: case sd := <-bsp.queue:
@@ -245,7 +242,7 @@ func (bsp *BatchSpanProcessor) drainQueue() {
} }
} }
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanSnapshot) { func (bsp *batchSpanProcessor) enqueue(sd *export.SpanSnapshot) {
if !sd.SpanContext.IsSampled() { if !sd.SpanContext.IsSampled() {
return return
} }

View File

@@ -198,7 +198,7 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
} }
} }
func createAndRegisterBatchSP(option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor { func createAndRegisterBatchSP(option testOption, te *testBatchExporter) sdktrace.SpanProcessor {
// Always use blocking queue to avoid flaky tests. // Always use blocking queue to avoid flaky tests.
options := append(option.o, sdktrace.WithBlocking()) options := append(option.o, sdktrace.WithBlocking())
return sdktrace.NewBatchSpanProcessor(te, options...) return sdktrace.NewBatchSpanProcessor(te, options...)

View File

@@ -21,29 +21,29 @@ import (
export "go.opentelemetry.io/otel/sdk/export/trace" export "go.opentelemetry.io/otel/sdk/export/trace"
) )
// SimpleSpanProcessor is a SpanProcessor that synchronously sends all // simpleSpanProcessor is a SpanProcessor that synchronously sends all
// SpanSnapshots to a trace.Exporter when the span finishes. // SpanSnapshots to a trace.Exporter when the span finishes.
type SimpleSpanProcessor struct { type simpleSpanProcessor struct {
e export.SpanExporter e export.SpanExporter
} }
var _ SpanProcessor = (*SimpleSpanProcessor)(nil) var _ SpanProcessor = (*simpleSpanProcessor)(nil)
// NewSimpleSpanProcessor returns a new SimpleSpanProcessor that will // NewSimpleSpanProcessor returns a new SpanProcessor that will synchronously
// synchronously send SpanSnapshots to the exporter. // send completed spans to the exporter immediately.
func NewSimpleSpanProcessor(exporter export.SpanExporter) *SimpleSpanProcessor { func NewSimpleSpanProcessor(exporter export.SpanExporter) SpanProcessor {
ssp := &SimpleSpanProcessor{ ssp := &simpleSpanProcessor{
e: exporter, e: exporter,
} }
return ssp return ssp
} }
// OnStart method does nothing. // OnStart method does nothing.
func (ssp *SimpleSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) { func (ssp *simpleSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {
} }
// OnEnd method exports a ReadOnlySpan using the associated exporter. // OnEnd method exports a ReadOnlySpan using the associated exporter.
func (ssp *SimpleSpanProcessor) OnEnd(s ReadOnlySpan) { func (ssp *simpleSpanProcessor) OnEnd(s ReadOnlySpan) {
if ssp.e != nil && s.SpanContext().IsSampled() { if ssp.e != nil && s.SpanContext().IsSampled() {
ss := s.Snapshot() ss := s.Snapshot()
if err := ssp.e.ExportSpans(context.Background(), []*export.SpanSnapshot{ss}); err != nil { if err := ssp.e.ExportSpans(context.Background(), []*export.SpanSnapshot{ss}); err != nil {
@@ -53,10 +53,10 @@ func (ssp *SimpleSpanProcessor) OnEnd(s ReadOnlySpan) {
} }
// Shutdown method does nothing. There is no data to cleanup. // Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown(_ context.Context) error { func (ssp *simpleSpanProcessor) Shutdown(_ context.Context) error {
return nil return nil
} }
// ForceFlush does nothing as there is no data to flush. // ForceFlush does nothing as there is no data to flush.
func (ssp *SimpleSpanProcessor) ForceFlush() { func (ssp *simpleSpanProcessor) ForceFlush() {
} }