1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-07-17 01:12:45 +02:00

Correct SDK trace Exporter interface (#1078)

* Update trace export interface

Move to conforming to the specification.

* Update documentation in export trace

* Update sdk trace provider to support new trace exporter

* Update SpanProcessors

Support the Provider changes and new trace exporter.

* Update the SDK to support the changes

* Update trace Provider to not return an error

* Update sdk with new Provider return

Also fix the testExporter ExportSpans method

* Update exporters with changes

* Update examples with changes

* Update Changelog

* Move error handling to end of shutdown

* Update exporter interface

Rename to SpanExporter to match specification. Add an error return value
to the Shutdown method based on feedback. Propagate these changes.

Remove the Stop method from the OTLP exporter to avoid confusion and
redundancy.

* Add test to check OTLP Shutdown honors context

* Add Jaeger exporter test for shutdown

* Fix race in Jaeger test

* Unify shutdown behavior and testing

* Update sdk/trace/simple_span_processor.go

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
This commit is contained in:
Tyler Yahn
2020-09-09 10:19:03 -07:00
committed by GitHub
parent da96fd0c5e
commit 422188a85f
27 changed files with 745 additions and 387 deletions

View File

@ -35,11 +35,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added ### Added
- `Noop` and `InMemory` `SpanBatcher` implementations to help with testing integrations. (#994)
- Integration tests for more OTel Collector Attribute types. (#1062)
- A dimensionality-reducing metric Processor. (#1057)
- Support for filtering metric label sets. (#1047)
- Support for exporting array-valued attributes via OTLP. (#992) - Support for exporting array-valued attributes via OTLP. (#992)
- `Noop` and `InMemory` `SpanBatcher` implementations to help with testing integrations. (#994)
- Support for filtering metric label sets. (#1047)
- A dimensionality-reducing metric Processor. (#1057)
- Integration tests for more OTel Collector Attribute types. (#1062)
- A new `WithSpanProcessor` `ProviderOption` is added to the `go.opentelemetry.io/otel/sdk/trace` package to create a `Provider` and automatically register the `SpanProcessor`. (#1078)
### Changed ### Changed
@ -58,6 +59,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Unify Callback Function Naming. - Unify Callback Function Naming.
Rename `*Callback` with `*Func`. (#1061) Rename `*Callback` with `*Func`. (#1061)
- CI builds validate against last two versions of Go, dropping 1.13 and adding 1.15. (#1064) - CI builds validate against last two versions of Go, dropping 1.13 and adding 1.15. (#1064)
- The `go.opentelemetry.io/otel/sdk/export/trace` interfaces `SpanSyncer` and `SpanBatcher` have been replaced with a specification compliant `Exporter` interface.
This interface still supports the export of `SpanData`, but only as a slice.
Implementation are also required now to return any error from `ExportSpans` if one occurs as well as implement a `Shutdown` method for exporter clean-up. (#1078)
- The `go.opentelemetry.io/otel/sdk/trace` `NewBatchSpanProcessor` function no longer returns an error.
If a `nil` exporter is passed as an argument to this function, instead of it returning an error, it now returns a `BatchSpanProcessor` that handles the export of `SpanData` by not taking any action. (#1078)
- The `go.opentelemetry.io/otel/sdk/trace` `NewProvider` function to create a `Provider` no longer returns an error, instead only a `*Provider`.
This change is related to `NewBatchSpanProcessor` not returning an error which was the only error this function would return. (#1078)
### Removed ### Removed

View File

@ -43,11 +43,14 @@ func initTracer() {
log.Panicf("failed to initialize stdout exporter %v\n", err) log.Panicf("failed to initialize stdout exporter %v\n", err)
return return
} }
tp, err = sdktrace.NewProvider(sdktrace.WithBatcher(exp), tp = sdktrace.NewProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()})) sdktrace.WithConfig(
if err != nil { sdktrace.Config{
log.Panicf("failed to initialize trace provider %v\n", err) DefaultSampler: sdktrace.AlwaysSample(),
} },
),
sdktrace.WithBatcher(exp),
)
global.SetTracerProvider(tp) global.SetTracerProvider(tp)
} }

View File

@ -54,7 +54,7 @@ func initProvider() (*otlp.Exporter, *push.Controller) {
) )
handleErr(err, "failed to create exporter") handleErr(err, "failed to create exporter")
tracerProvider, err := sdktrace.NewProvider( tracerProvider := sdktrace.NewProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithResource(resource.New( sdktrace.WithResource(resource.New(
// the service name used to display traces in backends // the service name used to display traces in backends
@ -62,7 +62,6 @@ func initProvider() (*otlp.Exporter, *push.Controller) {
)), )),
sdktrace.WithBatcher(exp), sdktrace.WithBatcher(exp),
) )
handleErr(err, "failed to create trace provider")
pusher := push.New( pusher := push.New(
basic.New( basic.New(
@ -84,7 +83,9 @@ func main() {
log.Printf("Waiting for connection...") log.Printf("Waiting for connection...")
exp, pusher := initProvider() exp, pusher := initProvider()
defer func() { handleErr(exp.Stop(), "failed to stop exporter") }() defer func() {
handleErr(exp.Shutdown(context.Background()), "failed to stop exporter")
}()
defer pusher.Stop() // pushes any last exports to the receiver defer pusher.Stop() // pushes any last exports to the receiver
tracer := global.Tracer("test-tracer") tracer := global.Tracer("test-tracer")

View File

@ -41,17 +41,13 @@ func main() {
}() }()
// Note: The exporter can also be used as a Batcher. E.g. // Note: The exporter can also be used as a Batcher. E.g.
// tracerProvider, err := sdktrace.NewProvider( // tracerProvider := sdktrace.NewProvider(
// sdktrace.WithBatcher(exporter, // sdktrace.WithBatcher(exporter,
// sdktrace.WithBatchTimeout(time.Second*15), // sdktrace.WithBatchTimeout(time.Second*15),
// sdktrace.WithMaxExportBatchSize(100), // sdktrace.WithMaxExportBatchSize(100),
// ), // ),
// ) // )
tracerProvider, err := sdktrace.NewProvider(sdktrace.WithBatcher(exporter)) tracerProvider := sdktrace.NewProvider(sdktrace.WithBatcher(exporter))
if err != nil {
log.Fatal("failed to create trace provider: %v", err)
}
pusher := push.New(simple.NewWithExactDistribution(), exporter) pusher := push.New(simple.NewWithExactDistribution(), exporter)
pusher.Start() pusher.Start()
metricProvider := pusher.Provider() metricProvider := pusher.Provider()

View File

@ -33,19 +33,22 @@ func Example_insecure() {
log.Fatalf("Failed to create the collector exporter: %v", err) log.Fatalf("Failed to create the collector exporter: %v", err)
} }
defer func() { defer func() {
_ = exp.Stop() ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
global.Handle(err)
}
}() }()
tp, _ := sdktrace.NewProvider( tp := sdktrace.NewProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(exp, // add following two options to ensure flush sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5), sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10), sdktrace.WithMaxExportBatchSize(10),
)) ),
if err != nil { )
log.Fatalf("error creating trace provider: %v\n", err)
}
global.SetTracerProvider(tp) global.SetTracerProvider(tp)
tracer := global.Tracer("test-tracer") tracer := global.Tracer("test-tracer")
@ -74,19 +77,22 @@ func Example_withTLS() {
log.Fatalf("failed to create the collector exporter: %v", err) log.Fatalf("failed to create the collector exporter: %v", err)
} }
defer func() { defer func() {
_ = exp.Stop() ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
global.Handle(err)
}
}() }()
tp, err := sdktrace.NewProvider( tp := sdktrace.NewProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(exp, // add following two options to ensure flush sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5), sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10), sdktrace.WithMaxExportBatchSize(10),
)) ),
if err != nil { )
log.Fatalf("error creating trace provider: %v\n", err)
}
global.SetTracerProvider(tp) global.SetTracerProvider(tp)
tracer := global.Tracer("test-tracer") tracer := global.Tracer("test-tracer")

View File

@ -28,7 +28,6 @@ import (
colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform" "go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric" metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
@ -57,7 +56,7 @@ type Exporter struct {
metadata metadata.MD metadata metadata.MD
} }
var _ tracesdk.SpanBatcher = (*Exporter)(nil) var _ tracesdk.SpanExporter = (*Exporter)(nil)
var _ metricsdk.Exporter = (*Exporter)(nil) var _ metricsdk.Exporter = (*Exporter)(nil)
func configureOptions(cfg *Config, opts ...ExporterOption) { func configureOptions(cfg *Config, opts ...ExporterOption) {
@ -195,10 +194,14 @@ func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, addr, dialOpts...) return grpc.DialContext(ctx, addr, dialOpts...)
} }
// Stop shuts down all the connections and resources // closeStopCh is used to wrap the exporters stopCh channel closing for testing.
// related to the exporter. var closeStopCh = func(stopCh chan bool) {
// If the exporter is not started then this func does nothing. close(stopCh)
func (e *Exporter) Stop() error { }
// Shutdown closes all connections and releases resources currently being used
// by the exporter. If the exporter is not started this does nothing.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.mu.RLock() e.mu.RLock()
cc := e.grpcClientConn cc := e.grpcClientConn
started := e.started started := e.started
@ -208,9 +211,9 @@ func (e *Exporter) Stop() error {
return nil return nil
} }
// Now close the underlying gRPC connection.
var err error var err error
if cc != nil { if cc != nil {
// Clean things up before checking this error.
err = cc.Close() err = cc.Close()
} }
@ -218,10 +221,14 @@ func (e *Exporter) Stop() error {
e.mu.Lock() e.mu.Lock()
e.started = false e.started = false
e.mu.Unlock() e.mu.Unlock()
close(e.stopCh) closeStopCh(e.stopCh)
// Ensure that the backgroundConnector returns // Ensure that the backgroundConnector returns
<-e.backgroundConnectionDoneCh select {
case <-e.backgroundConnectionDoneCh:
case <-ctx.Done():
return ctx.Err()
}
return err return err
} }
@ -272,27 +279,22 @@ func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) metricsdk
return metricsdk.PassThroughExporter return metricsdk.PassThroughExporter
} }
func (e *Exporter) ExportSpan(ctx context.Context, sd *tracesdk.SpanData) { func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) error {
e.uploadTraces(ctx, []*tracesdk.SpanData{sd}) return e.uploadTraces(ctx, sds)
} }
func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) { func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) error {
e.uploadTraces(ctx, sds)
}
func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) {
select { select {
case <-e.stopCh: case <-e.stopCh:
return return nil
default: default:
if !e.connected() { if !e.connected() {
return return nil
} }
protoSpans := transform.SpanData(sdl) protoSpans := transform.SpanData(sdl)
if len(protoSpans) == 0 { if len(protoSpans) == 0 {
return return nil
} }
e.senderMu.Lock() e.senderMu.Lock()
@ -302,7 +304,8 @@ func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) {
e.senderMu.Unlock() e.senderMu.Unlock()
if err != nil { if err != nil {
e.setStateDisconnected(err) e.setStateDisconnected(err)
global.Handle(err) return err
} }
} }
return nil
} }

View File

@ -79,29 +79,33 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
t.Fatalf("failed to create a new collector exporter: %v", err) t.Fatalf("failed to create a new collector exporter: %v", err)
} }
defer func() { defer func() {
_ = exp.Stop() ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}() }()
pOpts := []sdktrace.ProviderOption{ pOpts := []sdktrace.ProviderOption{
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(exp, // add following two options to ensure flush sdktrace.WithBatcher(
sdktrace.WithBatchTimeout(15), exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10), sdktrace.WithMaxExportBatchSize(10),
), ),
} }
tp1, err := sdktrace.NewProvider(append(pOpts, tp1 := sdktrace.NewProvider(append(pOpts,
sdktrace.WithResource(resource.New( sdktrace.WithResource(resource.New(
label.String("rk1", "rv11)"), label.String("rk1", "rv11)"),
label.Int64("rk2", 5), label.Int64("rk2", 5),
)))...) )))...)
assert.NoError(t, err)
tp2, err := sdktrace.NewProvider(append(pOpts, tp2 := sdktrace.NewProvider(append(pOpts,
sdktrace.WithResource(resource.New( sdktrace.WithResource(resource.New(
label.String("rk1", "rv12)"), label.String("rk1", "rv12)"),
label.Float32("rk3", 6.5), label.Float32("rk3", 6.5),
)))...) )))...)
assert.NoError(t, err)
tr1 := tp1.Tracer("test-tracer1") tr1 := tp1.Tracer("test-tracer1")
tr2 := tp2.Tracer("test-tracer2") tr2 := tp2.Tracer("test-tracer2")
@ -186,7 +190,9 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
<-time.After(40 * time.Millisecond) <-time.After(40 * time.Millisecond)
// Now shutdown the exporter // Now shutdown the exporter
if err := exp.Stop(); err != nil { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err) t.Fatalf("failed to stop the exporter: %v", err)
} }
@ -287,7 +293,9 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
t.Fatalf("error creating exporter: %v", err) t.Fatalf("error creating exporter: %v", err)
} }
defer func() { defer func() {
_ = exp.Stop() if err := exp.Shutdown(context.Background()); err != nil {
panic(err)
}
}() }()
// Invoke Start numerous times, should return errAlreadyStarted // Invoke Start numerous times, should return errAlreadyStarted
@ -297,10 +305,12 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
} }
} }
_ = exp.Stop() if err := exp.Shutdown(context.Background()); err != nil {
// Invoke Stop numerous times t.Fatalf("failed to Shutdown the exporter: %v", err)
}
// Invoke Shutdown numerous times
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
if err := exp.Stop(); err != nil { if err := exp.Shutdown(context.Background()); err != nil {
t.Fatalf(`#%d got error (%v) expected none`, i, err) t.Fatalf(`#%d got error (%v) expected none`, i, err)
} }
} }
@ -317,7 +327,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
defer func() { defer func() {
_ = exp.Stop() _ = exp.Shutdown(context.Background())
}() }()
// We'll now stop the collector right away to simulate a connection // We'll now stop the collector right away to simulate a connection
@ -329,7 +339,13 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
// reconnect. // reconnect.
for j := 0; j < 3; j++ { for j := 0; j < 3; j++ {
exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}) // No endpoint up.
require.Error(
t,
exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.address,
)
// Now resurrect the collector by making a new one but reusing the // Now resurrect the collector by making a new one but reusing the
// old address, and the collector should reconnect automatically. // old address, and the collector should reconnect automatically.
@ -340,7 +356,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
n := 10 n := 10
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "Resurrected"}}) require.NoError(t, exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "Resurrected"}}))
} }
nmaSpans := nmc.getSpans() nmaSpans := nmc.getSpans()
@ -381,7 +397,7 @@ func TestNewExporter_collectorOnBadConnection(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Despite an indefinite background reconnection, got error: %v", err) t.Fatalf("Despite an indefinite background reconnection, got error: %v", err)
} }
_ = exp.Stop() _ = exp.Shutdown(context.Background())
} }
func TestNewExporter_withAddress(t *testing.T) { func TestNewExporter_withAddress(t *testing.T) {
@ -396,7 +412,7 @@ func TestNewExporter_withAddress(t *testing.T) {
otlp.WithAddress(mc.address)) otlp.WithAddress(mc.address))
defer func() { defer func() {
_ = exp.Stop() _ = exp.Shutdown(context.Background())
}() }()
if err := exp.Start(); err != nil { if err := exp.Start(); err != nil {
@ -416,10 +432,10 @@ func TestNewExporter_withHeaders(t *testing.T) {
otlp.WithAddress(mc.address), otlp.WithAddress(mc.address),
otlp.WithHeaders(map[string]string{"header1": "value1"}), otlp.WithHeaders(map[string]string{"header1": "value1"}),
) )
exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}) require.NoError(t, exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}))
defer func() { defer func() {
_ = exp.Stop() _ = exp.Shutdown(context.Background())
}() }()
headers := mc.getHeaders() headers := mc.getHeaders()
@ -443,16 +459,18 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
) )
defer func() { defer func() {
_ = exp.Stop() _ = exp.Shutdown(context.Background())
}() }()
tp, err := sdktrace.NewProvider( tp := sdktrace.NewProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(exp, // add following two options to ensure flush sdktrace.WithBatcher(
sdktrace.WithBatchTimeout(15*time.Millisecond), exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10), sdktrace.WithMaxExportBatchSize(10),
)) ),
assert.NoError(t, err) )
tr := tp.Tracer("test-tracer") tr := tp.Tracer("test-tracer")
testKvs := []label.KeyValue{ testKvs := []label.KeyValue{
@ -480,7 +498,9 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
<-time.After(40 * time.Millisecond) <-time.After(40 * time.Millisecond)
// Now shutdown the exporter // Now shutdown the exporter
if err := exp.Stop(); err != nil { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err) t.Fatalf("failed to stop the exporter: %v", err)
} }

View File

@ -353,7 +353,7 @@ func TestExportSpans(t *testing.T) {
}, },
} { } {
tsc.Reset() tsc.Reset()
exp.ExportSpans(context.Background(), test.sd) assert.NoError(t, exp.ExportSpans(context.Background(), test.sd))
assert.ElementsMatch(t, test.want, tsc.ResourceSpans()) assert.ElementsMatch(t, test.want, tsc.ResourceSpans())
} }
} }

View File

@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp
import (
"context"
"errors"
"testing"
"time"
)
func TestExporterShutdownHonorsTimeout(t *testing.T) {
orig := closeStopCh
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer func() {
cancel()
closeStopCh = orig
}()
closeStopCh = func(stopCh chan bool) {
go func() {
<-ctx.Done()
close(stopCh)
}()
}
e := NewUnstartedExporter()
if err := e.Start(); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
innerCtx, innerCancel := context.WithTimeout(ctx, time.Microsecond)
if err := e.Shutdown(innerCtx); err == nil {
t.Error("expected context DeadlineExceeded error, got nil")
} else if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected context DeadlineExceeded error, got %v", err)
}
innerCancel()
}
func TestExporterShutdownHonorsCancel(t *testing.T) {
orig := closeStopCh
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer func() {
cancel()
closeStopCh = orig
}()
closeStopCh = func(stopCh chan bool) {
go func() {
<-ctx.Done()
close(stopCh)
}()
}
e := NewUnstartedExporter()
if err := e.Start(); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
var innerCancel context.CancelFunc
ctx, innerCancel = context.WithCancel(ctx)
innerCancel()
if err := e.Shutdown(ctx); err == nil {
t.Error("expected context canceled error, got nil")
} else if !errors.Is(err, context.Canceled) {
t.Errorf("expected context canceled error, got %v", err)
}
}
func TestExporterShutdownNoError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
e := NewUnstartedExporter()
if err := e.Start(); err != nil {
t.Fatalf("failed to start exporter: %v", err)
}
if err := e.Shutdown(ctx); err != nil {
t.Errorf("shutdown errored: expected nil, got %v", err)
}
}

View File

@ -32,8 +32,7 @@ type Exporter struct {
var ( var (
_ metric.Exporter = &Exporter{} _ metric.Exporter = &Exporter{}
_ trace.SpanSyncer = &Exporter{} _ trace.SpanExporter = &Exporter{}
_ trace.SpanBatcher = &Exporter{}
) )
// NewExporter creates an Exporter with the passed options. // NewExporter creates an Exporter with the passed options.
@ -43,7 +42,7 @@ func NewExporter(options ...Option) (*Exporter, error) {
return nil, err return nil, err
} }
return &Exporter{ return &Exporter{
traceExporter: traceExporter{config}, traceExporter: traceExporter{config: config},
metricExporter: metricExporter{config}, metricExporter: metricExporter{config},
}, nil }, nil
} }
@ -57,11 +56,7 @@ func NewExportPipeline(exportOpts []Option, pushOpts []push.Option) (apitrace.Pr
return nil, nil, err return nil, nil, err
} }
tp, err := sdktrace.NewProvider(sdktrace.WithSyncer(exporter)) tp := sdktrace.NewProvider(sdktrace.WithBatcher(exporter))
if err != nil {
return nil, nil, err
}
pusher := push.New( pusher := push.New(
basic.New( basic.New(
simple.NewWithExactDistribution(), simple.NewWithExactDistribution(),

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"go.opentelemetry.io/otel/sdk/export/trace" "go.opentelemetry.io/otel/sdk/export/trace"
) )
@ -25,28 +26,43 @@ import (
// Exporter is an implementation of trace.SpanSyncer that writes spans to stdout. // Exporter is an implementation of trace.SpanSyncer that writes spans to stdout.
type traceExporter struct { type traceExporter struct {
config Config config Config
}
// ExportSpan writes a SpanData in json format to stdout. stoppedMu sync.RWMutex
func (e *traceExporter) ExportSpan(ctx context.Context, data *trace.SpanData) { stopped bool
if e.config.DisableTraceExport {
return
}
e.ExportSpans(ctx, []*trace.SpanData{data})
} }
// ExportSpans writes SpanData in json format to stdout. // ExportSpans writes SpanData in json format to stdout.
func (e *traceExporter) ExportSpans(ctx context.Context, data []*trace.SpanData) { func (e *traceExporter) ExportSpans(ctx context.Context, data []*trace.SpanData) error {
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
return nil
}
if e.config.DisableTraceExport || len(data) == 0 { if e.config.DisableTraceExport || len(data) == 0 {
return return nil
} }
out, err := e.marshal(data) out, err := e.marshal(data)
if err != nil { if err != nil {
fmt.Fprintf(e.config.Writer, "error converting spanData to json: %v", err) return err
return
} }
fmt.Fprintln(e.config.Writer, string(out)) _, err = fmt.Fprintln(e.config.Writer, string(out))
return err
}
// Shutdown is called to stop the exporter, it preforms no action.
func (e *traceExporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
} }
// marshal v with approriate indentation. // marshal v with approriate indentation.

View File

@ -18,6 +18,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"testing" "testing"
"time" "time"
@ -67,7 +68,9 @@ func TestExporter_ExportSpan(t *testing.T) {
StatusMessage: "interesting", StatusMessage: "interesting",
Resource: resource, Resource: resource,
} }
ex.ExportSpan(context.Background(), testSpan) if err := ex.ExportSpans(context.Background(), []*export.SpanData{testSpan}); err != nil {
t.Fatal(err)
}
expectedSerializedNow, _ := json.Marshal(now) expectedSerializedNow, _ := json.Marshal(now)
@ -133,3 +136,51 @@ func TestExporter_ExportSpan(t *testing.T) {
t.Errorf("Want: %v but got: %v", expectedOutput, got) t.Errorf("Want: %v but got: %v", expectedOutput, got)
} }
} }
func TestExporterShutdownHonorsTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
e, err := stdout.NewExporter()
if err != nil {
t.Fatalf("failed to create exporter: %v", err)
}
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
defer innerCancel()
<-innerCtx.Done()
if err := e.Shutdown(innerCtx); err == nil {
t.Error("expected context DeadlineExceeded error, got nil")
} else if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected context DeadlineExceeded error, got %v", err)
}
}
func TestExporterShutdownHonorsCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
e, err := stdout.NewExporter()
if err != nil {
t.Fatalf("failed to create exporter: %v", err)
}
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()
if err := e.Shutdown(innerCtx); err == nil {
t.Error("expected context canceled error, got nil")
} else if !errors.Is(err, context.Canceled) {
t.Errorf("expected context canceled error, got %v", err)
}
}
func TestExporterShutdownNoError(t *testing.T) {
e, err := stdout.NewExporter()
if err != nil {
t.Fatalf("failed to create exporter: %v", err)
}
if err := e.Shutdown(context.Background()); err != nil {
t.Errorf("shutdown errored: expected nil, got %v", err)
}
}

View File

@ -17,6 +17,8 @@ package jaeger
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"fmt"
"sync"
"google.golang.org/api/support/bundler" "google.golang.org/api/support/bundler"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -163,15 +165,12 @@ func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (apitrace.
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
syncer := sdktrace.WithSyncer(exporter)
tp, err := sdktrace.NewProvider(syncer)
if err != nil {
return nil, nil, err
}
if exporter.o.Config != nil {
tp.ApplyConfig(*exporter.o.Config)
}
pOpts := []sdktrace.ProviderOption{sdktrace.WithSyncer(exporter)}
if exporter.o.Config != nil {
pOpts = append(pOpts, sdktrace.WithConfig(*exporter.o.Config))
}
tp := sdktrace.NewProvider(pOpts...)
return tp, exporter.Flush, nil return tp, exporter.Flush, nil
} }
@ -203,14 +202,61 @@ type Exporter struct {
bundler *bundler.Bundler bundler *bundler.Bundler
uploader batchUploader uploader batchUploader
o options o options
stoppedMu sync.RWMutex
stopped bool
} }
var _ export.SpanSyncer = (*Exporter)(nil) var _ export.SpanExporter = (*Exporter)(nil)
// ExportSpan exports a SpanData to Jaeger. // ExportSpans exports SpanData to Jaeger.
func (e *Exporter) ExportSpan(ctx context.Context, d *export.SpanData) { func (e *Exporter) ExportSpans(ctx context.Context, spans []*export.SpanData) error {
_ = e.bundler.Add(spanDataToThrift(d), 1) e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
return nil
}
for _, span := range spans {
// TODO(jbd): Handle oversized bundlers. // TODO(jbd): Handle oversized bundlers.
err := e.bundler.Add(spanDataToThrift(span), 1)
if err != nil {
return fmt.Errorf("failed to bundle %q: %w", span.Name, err)
}
}
return nil
}
// flush is used to wrap the bundler's Flush method for testing.
var flush = func(e *Exporter) {
e.bundler.Flush()
}
// Shutdown stops the exporter flushing any pending exports.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()
done := make(chan struct{}, 1)
// Shadow so if the goroutine is leaked in testing it doesn't cause a race
// condition when the file level var is reset.
go func(FlushFunc func(*Exporter)) {
// The OpenTelemetry specification is explicit in not having this
// method block so the preference here is to orphan this goroutine if
// the context is canceled or times out while this flushing is
// occurring. This is a consequence of the bundler Flush method not
// supporting a context.
FlushFunc(e)
done <- struct{}{}
}(flush)
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
}
return nil
} }
func spanDataToThrift(data *export.SpanData) *gen.Span { func spanDataToThrift(data *export.SpanData) *gen.Span {
@ -388,7 +434,7 @@ func getBoolTag(k string, b bool) *gen.Tag {
// //
// This is useful if your program is ending and you do not want to lose recent spans. // This is useful if your program is ending and you do not want to lose recent spans.
func (e *Exporter) Flush() { func (e *Exporter) Flush() {
e.bundler.Flush() flush(e)
} }
func (e *Exporter) upload(spans []*gen.Span) error { func (e *Exporter) upload(spans []*gen.Span) error {

View File

@ -339,12 +339,10 @@ func TestExporter_ExportSpan(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
tp, err := sdktrace.NewProvider( tp := sdktrace.NewProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exp)) sdktrace.WithSyncer(exp),
)
assert.NoError(t, err)
global.SetTracerProvider(tp) global.SetTracerProvider(tp)
_, span := global.Tracer("test-tracer").Start(context.Background(), "test-span") _, span := global.Tracer("test-tracer").Start(context.Background(), "test-span")
span.End() span.End()
@ -481,3 +479,50 @@ func Test_spanDataToThrift(t *testing.T) {
}) })
} }
} }
func TestExporterShutdownHonorsCancel(t *testing.T) {
orig := flush
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// Do this after the parent context is canceled to avoid a race.
defer func() {
<-ctx.Done()
flush = orig
}()
defer cancel()
flush = func(e *Exporter) {
<-ctx.Done()
}
e, err := NewRawExporter(withTestCollectorEndpoint())
require.NoError(t, err)
innerCtx, innerCancel := context.WithCancel(ctx)
go innerCancel()
assert.Errorf(t, e.Shutdown(innerCtx), context.Canceled.Error())
}
func TestExporterShutdownHonorsTimeout(t *testing.T) {
orig := flush
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// Do this after the parent context is canceled to avoid a race.
defer func() {
<-ctx.Done()
flush = orig
}()
defer cancel()
flush = func(e *Exporter) {
<-ctx.Done()
}
e, err := NewRawExporter(withTestCollectorEndpoint())
require.NoError(t, err)
innerCtx, innerCancel := context.WithTimeout(ctx, time.Microsecond*10)
assert.Errorf(t, e.Shutdown(innerCtx), context.DeadlineExceeded.Error())
innerCancel()
}
func TestErrorOnExportShutdownExporter(t *testing.T) {
e, err := NewRawExporter(withTestCollectorEndpoint())
require.NoError(t, err)
assert.NoError(t, e.Shutdown(context.Background()))
assert.NoError(t, e.ExportSpans(context.Background(), nil))
}

View File

@ -24,6 +24,7 @@ import (
"log" "log"
"net/http" "net/http"
"net/url" "net/url"
"sync"
"go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/global"
export "go.opentelemetry.io/otel/sdk/export/trace" export "go.opentelemetry.io/otel/sdk/export/trace"
@ -39,10 +40,13 @@ type Exporter struct {
client *http.Client client *http.Client
logger *log.Logger logger *log.Logger
o options o options
stoppedMu sync.RWMutex
stopped bool
} }
var ( var (
_ export.SpanBatcher = &Exporter{} _ export.SpanExporter = &Exporter{}
) )
// Options contains configuration for the exporter. // Options contains configuration for the exporter.
@ -113,11 +117,7 @@ func NewExportPipeline(collectorURL, serviceName string, opts ...Option) (*sdktr
return nil, err return nil, err
} }
batcher := sdktrace.WithBatcher(exp) tp := sdktrace.NewProvider(sdktrace.WithBatcher(exp))
tp, err := sdktrace.NewProvider(batcher)
if err != nil {
return nil, err
}
if exp.o.config != nil { if exp.o.config != nil {
tp.ApplyConfig(*exp.o.config) tp.ApplyConfig(*exp.o.config)
} }
@ -137,42 +137,63 @@ func InstallNewPipeline(collectorURL, serviceName string, opts ...Option) error
return nil return nil
} }
// ExportSpans is a part of an implementation of the SpanBatcher // ExportSpans exports SpanData to a Zipkin receiver.
// interface. func (e *Exporter) ExportSpans(ctx context.Context, batch []*export.SpanData) error {
func (e *Exporter) ExportSpans(ctx context.Context, batch []*export.SpanData) { e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
if stopped {
e.logf("exporter stopped, not exporting span batch")
return nil
}
if len(batch) == 0 { if len(batch) == 0 {
e.logf("no spans to export") e.logf("no spans to export")
return return nil
} }
models := toZipkinSpanModels(batch, e.serviceName) models := toZipkinSpanModels(batch, e.serviceName)
body, err := json.Marshal(models) body, err := json.Marshal(models)
if err != nil { if err != nil {
e.logf("failed to serialize zipkin models to JSON: %v", err) return e.errf("failed to serialize zipkin models to JSON: %v", err)
return
} }
e.logf("about to send a POST request to %s with body %s", e.url, body) e.logf("about to send a POST request to %s with body %s", e.url, body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.url, bytes.NewBuffer(body)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.url, bytes.NewBuffer(body))
if err != nil { if err != nil {
e.logf("failed to create request to %s: %v", e.url, err) return e.errf("failed to create request to %s: %v", e.url, err)
return
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
resp, err := e.client.Do(req) resp, err := e.client.Do(req)
if err != nil { if err != nil {
e.logf("request to %s failed: %v", e.url, err) return e.errf("request to %s failed: %v", e.url, err)
return
} }
e.logf("zipkin responded with status %d", resp.StatusCode) e.logf("zipkin responded with status %d", resp.StatusCode)
_, err = ioutil.ReadAll(resp.Body) _, err = ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
e.logf("failed to read response body: %v", err) // Best effort to clean up here.
resp.Body.Close()
return e.errf("failed to read response body: %v", err)
} }
err = resp.Body.Close() err = resp.Body.Close()
if err != nil { if err != nil {
e.logf("failed to close response body: %v", err) return e.errf("failed to close response body: %v", err)
} }
return nil
}
// Shutdown stops the exporter flushing any pending exports.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.stoppedMu.Lock()
e.stopped = true
e.stoppedMu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
} }
func (e *Exporter) logf(format string, args ...interface{}) { func (e *Exporter) logf(format string, args ...interface{}) {
@ -180,3 +201,8 @@ func (e *Exporter) logf(format string, args ...interface{}) {
e.logger.Printf(format, args...) e.logger.Printf(format, args...)
} }
} }
func (e *Exporter) errf(format string, args ...interface{}) error {
e.logf(format, args...)
return fmt.Errorf(format, args...)
}

View File

@ -339,16 +339,16 @@ func TestExportSpans(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
require.Len(t, ls.Messages, 0) require.Len(t, ls.Messages, 0)
exporter.ExportSpans(ctx, spans[0:1]) require.NoError(t, exporter.ExportSpans(ctx, spans[0:1]))
require.Len(t, ls.Messages, 2) require.Len(t, ls.Messages, 2)
require.Contains(t, ls.Messages[0], "send a POST request") require.Contains(t, ls.Messages[0], "send a POST request")
require.Contains(t, ls.Messages[1], "zipkin responded") require.Contains(t, ls.Messages[1], "zipkin responded")
ls.Messages = nil ls.Messages = nil
exporter.ExportSpans(ctx, nil) require.NoError(t, exporter.ExportSpans(ctx, nil))
require.Len(t, ls.Messages, 1) require.Len(t, ls.Messages, 1)
require.Contains(t, ls.Messages[0], "no spans to export") require.Contains(t, ls.Messages[0], "no spans to export")
ls.Messages = nil ls.Messages = nil
exporter.ExportSpans(ctx, spans[1:2]) require.NoError(t, exporter.ExportSpans(ctx, spans[1:2]))
require.Contains(t, ls.Messages[0], "send a POST request") require.Contains(t, ls.Messages[0], "send a POST request")
require.Contains(t, ls.Messages[1], "zipkin responded") require.Contains(t, ls.Messages[1], "zipkin responded")
checkFunc := func() bool { checkFunc := func() bool {
@ -357,3 +357,35 @@ func TestExportSpans(t *testing.T) {
require.Eventually(t, checkFunc, time.Second, 10*time.Millisecond) require.Eventually(t, checkFunc, time.Second, 10*time.Millisecond)
require.Equal(t, models, collector.StealModels()) require.Equal(t, models, collector.StealModels())
} }
func TestExporterShutdownHonorsTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
exp, err := NewRawExporter(collectorURL, serviceName)
require.NoError(t, err)
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
defer innerCancel()
<-innerCtx.Done()
assert.Errorf(t, exp.Shutdown(innerCtx), context.DeadlineExceeded.Error())
}
func TestExporterShutdownHonorsCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
exp, err := NewRawExporter(collectorURL, serviceName)
require.NoError(t, err)
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()
assert.Errorf(t, exp.Shutdown(innerCtx), context.Canceled.Error())
}
func TestErrorOnExportShutdownExporter(t *testing.T) {
exp, err := NewRawExporter(collectorURL, serviceName)
require.NoError(t, err)
assert.NoError(t, exp.Shutdown(context.Background()))
assert.NoError(t, exp.ExportSpans(context.Background(), nil))
}

View File

@ -26,28 +26,29 @@ import (
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
// SpanSyncer is a type for functions that receive a single sampled trace span. // SpanExporter handles the delivery of SpanData to external receivers. This is
// // the final component in the trace export pipeline.
// The ExportSpan method is called synchronously. Therefore, it should not take type SpanExporter interface {
// forever to process the span. // ExportSpans exports a batch of SpanData.
// //
// The SpanData should not be modified. // This function is called synchronously, so there is no concurrency
type SpanSyncer interface { // safety requirement. However, due to the synchronous calling pattern,
ExportSpan(context.Context, *SpanData) // it is critical that all timeouts and cancellations contained in the
// passed context must be honored.
//
// Any retry logic must be contained in this function. The SDK that
// calls this function will not implement any retry logic. All errors
// returned by this function are considered unrecoverable and will be
// reported to a configured error Handler.
ExportSpans(context.Context, []*SpanData) error
// Shutdown notifies the exporter of a pending halt to operations. The
// exporter is expected to preform any cleanup or synchronization it
// requires while honoring all timeouts and cancellations contained in
// the passed context.
Shutdown(context.Context) error
} }
// SpanBatcher is a type for functions that receive batched of sampled trace // SpanData contains all the information collected by a completed span.
// spans.
//
// The ExportSpans method is called asynchronously. However its should not take
// forever to process the spans.
//
// The SpanData should not be modified.
type SpanBatcher interface {
ExportSpans(context.Context, []*SpanData)
}
// SpanData contains all the information collected by a span.
type SpanData struct { type SpanData struct {
SpanContext apitrace.SpanContext SpanContext apitrace.SpanContext
ParentSpanID apitrace.SpanID ParentSpanID apitrace.SpanID
@ -74,17 +75,16 @@ type SpanData struct {
Resource *resource.Resource Resource *resource.Resource
// InstrumentationLibrary defines the instrumentation library used to // InstrumentationLibrary defines the instrumentation library used to
// providing instrumentation. // provide instrumentation.
InstrumentationLibrary instrumentation.Library InstrumentationLibrary instrumentation.Library
} }
// Event is used to describe an Event with a message string and set of // Event is thing that happened during a Span's lifetime.
// Attributes.
type Event struct { type Event struct {
// Name is the name of this event // Name is the name of this event
Name string Name string
// Attributes contains a list of key-value pairs. // Attributes describe the aspects of the event.
Attributes []label.KeyValue Attributes []label.KeyValue
// Time is the time at which this event was recorded. // Time is the time at which this event was recorded.

View File

@ -23,51 +23,48 @@ import (
"go.opentelemetry.io/otel/sdk/export/trace" "go.opentelemetry.io/otel/sdk/export/trace"
) )
var _ trace.SpanBatcher = (*NoopExporter)(nil) var _ trace.SpanExporter = (*NoopExporter)(nil)
var _ trace.SpanSyncer = (*NoopExporter)(nil)
// NewNoopExporter returns a new no-op exporter. // NewNoopExporter returns a new no-op exporter.
// It implements both trace.SpanBatcher and trace.SpanSyncer.
func NewNoopExporter() *NoopExporter { func NewNoopExporter() *NoopExporter {
return new(NoopExporter) return new(NoopExporter)
} }
// NoopExporter is an exporter that does nothing. // NoopExporter is an exporter that drops all received SpanData and performs
// no action.
type NoopExporter struct{} type NoopExporter struct{}
// ExportSpans implements the trace.SpanBatcher interface. // ExportSpans handles export of SpanData by dropping it.
func (nsb *NoopExporter) ExportSpans(context.Context, []*trace.SpanData) {} func (nsb *NoopExporter) ExportSpans(context.Context, []*trace.SpanData) error { return nil }
// ExportSpan implements the trace.SpanSyncer interface. // Shutdown stops the exporter by doing nothing.
func (nsb *NoopExporter) ExportSpan(context.Context, *trace.SpanData) {} func (nsb *NoopExporter) Shutdown(context.Context) error { return nil }
var _ trace.SpanBatcher = (*InMemoryExporter)(nil) var _ trace.SpanExporter = (*InMemoryExporter)(nil)
var _ trace.SpanSyncer = (*InMemoryExporter)(nil)
// NewInMemoryExporter returns a new trace.SpanBatcher that stores in-memory all exported spans. // NewInMemoryExporter returns a new InMemoryExporter.
// It implements both trace.SpanBatcher and trace.SpanSyncer.
func NewInMemoryExporter() *InMemoryExporter { func NewInMemoryExporter() *InMemoryExporter {
return new(InMemoryExporter) return new(InMemoryExporter)
} }
// InMemoryExporter is an exporter that stores in-memory all exported spans. // InMemoryExporter is an exporter that stores all received spans in-memory.
type InMemoryExporter struct { type InMemoryExporter struct {
mu sync.Mutex mu sync.Mutex
sds []*trace.SpanData sds []*trace.SpanData
} }
// ExportSpans implements the trace.SpanBatcher interface. // ExportSpans handles export of SpanData by storing it in memory.
func (imsb *InMemoryExporter) ExportSpans(_ context.Context, sds []*trace.SpanData) { func (imsb *InMemoryExporter) ExportSpans(_ context.Context, sds []*trace.SpanData) error {
imsb.mu.Lock() imsb.mu.Lock()
defer imsb.mu.Unlock() defer imsb.mu.Unlock()
imsb.sds = append(imsb.sds, sds...) imsb.sds = append(imsb.sds, sds...)
return nil
} }
// ExportSpan implements the trace.SpanSyncer interface. // Shutdown stops the exporter by clearing SpanData held in memory.
func (imsb *InMemoryExporter) ExportSpan(_ context.Context, sd *trace.SpanData) { func (imsb *InMemoryExporter) Shutdown(context.Context) error {
imsb.mu.Lock() imsb.Reset()
defer imsb.mu.Unlock() return nil
imsb.sds = append(imsb.sds, sd)
} }
// Reset the current in-memory storage. // Reset the current in-memory storage.

View File

@ -19,6 +19,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/export/trace" "go.opentelemetry.io/otel/sdk/export/trace"
) )
@ -27,23 +28,22 @@ import (
func TestNoop(t *testing.T) { func TestNoop(t *testing.T) {
nsb := NewNoopExporter() nsb := NewNoopExporter()
nsb.ExportSpans(context.Background(), nil) require.NoError(t, nsb.ExportSpans(context.Background(), nil))
nsb.ExportSpans(context.Background(), make([]*trace.SpanData, 10)) require.NoError(t, nsb.ExportSpans(context.Background(), make([]*trace.SpanData, 10)))
nsb.ExportSpans(context.Background(), make([]*trace.SpanData, 0, 10)) require.NoError(t, nsb.ExportSpans(context.Background(), make([]*trace.SpanData, 0, 10)))
nsb.ExportSpan(context.Background(), nil)
} }
func TestNewInMemoryExporter(t *testing.T) { func TestNewInMemoryExporter(t *testing.T) {
imsb := NewInMemoryExporter() imsb := NewInMemoryExporter()
imsb.ExportSpans(context.Background(), nil) require.NoError(t, imsb.ExportSpans(context.Background(), nil))
assert.Len(t, imsb.GetSpans(), 0) assert.Len(t, imsb.GetSpans(), 0)
input := make([]*trace.SpanData, 10) input := make([]*trace.SpanData, 10)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
input[i] = new(trace.SpanData) input[i] = new(trace.SpanData)
} }
imsb.ExportSpans(context.Background(), input) require.NoError(t, imsb.ExportSpans(context.Background(), input))
sds := imsb.GetSpans() sds := imsb.GetSpans()
assert.Len(t, sds, 10) assert.Len(t, sds, 10)
for i, sd := range sds { for i, sd := range sds {
@ -54,7 +54,7 @@ func TestNewInMemoryExporter(t *testing.T) {
assert.Len(t, sds, 10) assert.Len(t, sds, 10)
assert.Len(t, imsb.GetSpans(), 0) assert.Len(t, imsb.GetSpans(), 0)
imsb.ExportSpan(context.Background(), input[0]) require.NoError(t, imsb.ExportSpans(context.Background(), input[0:1]))
sds = imsb.GetSpans() sds = imsb.GetSpans()
assert.Len(t, sds, 1) assert.Len(t, sds, 1)
assert.Same(t, input[0], sds[0]) assert.Same(t, input[0], sds[0])

View File

@ -16,12 +16,12 @@ package trace
import ( import (
"context" "context"
"errors"
"runtime" "runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"go.opentelemetry.io/otel/api/global"
export "go.opentelemetry.io/otel/sdk/export/trace" export "go.opentelemetry.io/otel/sdk/export/trace"
) )
@ -31,10 +31,6 @@ const (
DefaultMaxExportBatchSize = 512 DefaultMaxExportBatchSize = 512
) )
var (
errNilExporter = errors.New("exporter is nil")
)
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
type BatchSpanProcessorOptions struct { type BatchSpanProcessorOptions struct {
@ -61,11 +57,10 @@ type BatchSpanProcessorOptions struct {
BlockOnQueueFull bool BlockOnQueueFull bool
} }
// BatchSpanProcessor implements SpanProcessor interfaces. It is used by // BatchSpanProcessor is a SpanProcessor that batches asynchronously received
// exporters to receive export.SpanData asynchronously. // SpanData and sends it to a trace.Exporter when complete.
// Use BatchSpanProcessorOptions to change the behavior of the processor.
type BatchSpanProcessor struct { type BatchSpanProcessor struct {
e export.SpanBatcher e export.SpanExporter
o BatchSpanProcessorOptions o BatchSpanProcessorOptions
queue chan *export.SpanData queue chan *export.SpanData
@ -80,25 +75,24 @@ type BatchSpanProcessor struct {
var _ SpanProcessor = (*BatchSpanProcessor)(nil) var _ SpanProcessor = (*BatchSpanProcessor)(nil)
// NewBatchSpanProcessor creates a new instance of BatchSpanProcessor // NewBatchSpanProcessor creates a new BatchSpanProcessor that will send
// for a given export. It returns an error if exporter is nil. // SpanData batches to the exporters with the supplied options.
// The newly created BatchSpanProcessor should then be registered with sdk //
// using RegisterSpanProcessor. // The returned BatchSpanProcessor needs to be registered with the SDK using
func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOption) (*BatchSpanProcessor, error) { // the RegisterSpanProcessor method for it to process spans.
if e == nil { //
return nil, errNilExporter // If the exporter is nil, the span processor will preform no action.
} func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanProcessorOption) *BatchSpanProcessor {
o := BatchSpanProcessorOptions{ o := BatchSpanProcessorOptions{
BatchTimeout: DefaultBatchTimeout, BatchTimeout: DefaultBatchTimeout,
MaxQueueSize: DefaultMaxQueueSize, MaxQueueSize: DefaultMaxQueueSize,
MaxExportBatchSize: DefaultMaxExportBatchSize, MaxExportBatchSize: DefaultMaxExportBatchSize,
} }
for _, opt := range opts { for _, opt := range options {
opt(&o) opt(&o)
} }
bsp := &BatchSpanProcessor{ bsp := &BatchSpanProcessor{
e: e, e: exporter,
o: o, o: o,
batch: make([]*export.SpanData, 0, o.MaxExportBatchSize), batch: make([]*export.SpanData, 0, o.MaxExportBatchSize),
timer: time.NewTimer(o.BatchTimeout), timer: time.NewTimer(o.BatchTimeout),
@ -113,15 +107,18 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio
bsp.drainQueue() bsp.drainQueue()
}() }()
return bsp, nil return bsp
} }
// OnStart method does nothing. // OnStart method does nothing.
func (bsp *BatchSpanProcessor) OnStart(sd *export.SpanData) { func (bsp *BatchSpanProcessor) OnStart(sd *export.SpanData) {}
}
// OnEnd method enqueues export.SpanData for later processing. // OnEnd method enqueues export.SpanData for later processing.
func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) { func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) {
// Do not enqueue spans if we are just going to drop them.
if bsp.e == nil {
return
}
bsp.enqueue(sd) bsp.enqueue(sd)
} }
@ -163,7 +160,9 @@ func (bsp *BatchSpanProcessor) exportSpans() {
bsp.timer.Reset(bsp.o.BatchTimeout) bsp.timer.Reset(bsp.o.BatchTimeout)
if len(bsp.batch) > 0 { if len(bsp.batch) > 0 {
bsp.e.ExportSpans(context.Background(), bsp.batch) if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil {
global.Handle(err)
}
bsp.batch = bsp.batch[:0] bsp.batch = bsp.batch[:0]
} }
} }

View File

@ -33,15 +33,18 @@ type testBatchExporter struct {
batchCount int batchCount int
} }
func (t *testBatchExporter) ExportSpans(ctx context.Context, sds []*export.SpanData) { func (t *testBatchExporter) ExportSpans(ctx context.Context, sds []*export.SpanData) error {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
t.spans = append(t.spans, sds...) t.spans = append(t.spans, sds...)
t.sizes = append(t.sizes, len(sds)) t.sizes = append(t.sizes, len(sds))
t.batchCount++ t.batchCount++
return nil
} }
func (t *testBatchExporter) Shutdown(context.Context) error { return nil }
func (t *testBatchExporter) len() int { func (t *testBatchExporter) len() int {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
@ -54,13 +57,14 @@ func (t *testBatchExporter) getBatchCount() int {
return t.batchCount return t.batchCount
} }
var _ export.SpanBatcher = (*testBatchExporter)(nil) var _ export.SpanExporter = (*testBatchExporter)(nil)
func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) { func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
_, err := sdktrace.NewBatchSpanProcessor(nil) bsp := sdktrace.NewBatchSpanProcessor(nil)
if err == nil { // These should not panic.
t.Errorf("Expected error while creating processor with nil exporter") bsp.OnStart(&export.SpanData{})
} bsp.OnEnd(&export.SpanData{})
bsp.Shutdown()
} }
type testOption struct { type testOption struct {
@ -149,7 +153,7 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
t.Run(option.name, func(t *testing.T) { t.Run(option.name, func(t *testing.T) {
te := testBatchExporter{} te := testBatchExporter{}
tp := basicProvider(t) tp := basicProvider(t)
ssp := createAndRegisterBatchSP(t, option, &te) ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil { if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
} }
@ -176,14 +180,10 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
} }
} }
func createAndRegisterBatchSP(t *testing.T, option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor { func createAndRegisterBatchSP(option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor {
// Always use blocking queue to avoid flaky tests. // Always use blocking queue to avoid flaky tests.
options := append(option.o, sdktrace.WithBlocking()) options := append(option.o, sdktrace.WithBlocking())
ssp, err := sdktrace.NewBatchSpanProcessor(te, options...) return sdktrace.NewBatchSpanProcessor(te, options...)
if ssp == nil {
t.Errorf("%s: Error creating new instance of BatchSpanProcessor, error: %v\n", option.name, err)
}
return ssp
} }
func generateSpan(t *testing.T, parallel bool, tr apitrace.Tracer, option testOption) { func generateSpan(t *testing.T, parallel bool, tr apitrace.Tracer, option testOption) {
@ -219,14 +219,7 @@ func getSpanContext() apitrace.SpanContext {
} }
func TestBatchSpanProcessorShutdown(t *testing.T) { func TestBatchSpanProcessorShutdown(t *testing.T) {
bsp, err := sdktrace.NewBatchSpanProcessor(&testBatchExporter{}) bsp := sdktrace.NewBatchSpanProcessor(&testBatchExporter{})
if err != nil {
t.Errorf("Unexpected error while creating processor\n")
}
if bsp == nil {
t.Fatalf("Error creating new instance of BatchSpanProcessor\n")
}
bsp.Shutdown() bsp.Shutdown()

View File

@ -166,9 +166,6 @@ func traceBenchmark(b *testing.B, name string, fn func(*testing.B, apitrace.Trac
} }
func tracer(b *testing.B, name string, sampler sdktrace.Sampler) apitrace.Tracer { func tracer(b *testing.B, name string, sampler sdktrace.Sampler) apitrace.Tracer {
tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sampler})) tp := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sampler}))
if err != nil {
b.Fatalf("Failed to create trace provider for test %s\n", name)
}
return tp.Tracer(name) return tp.Tracer(name)
} }

View File

@ -29,16 +29,12 @@ const (
defaultTracerName = "go.opentelemetry.io/otel/sdk/tracer" defaultTracerName = "go.opentelemetry.io/otel/sdk/tracer"
) )
// batcher contains export.SpanBatcher and its options. // TODO (MrAlias): unify this API option design:
type batcher struct { // https://github.com/open-telemetry/opentelemetry-go/issues/536
b export.SpanBatcher
opts []BatchSpanProcessorOption
}
// ProviderOptions // ProviderOptions
type ProviderOptions struct { type ProviderOptions struct {
syncers []export.SpanSyncer processors []SpanProcessor
batchers []batcher
config Config config Config
} }
@ -56,7 +52,7 @@ var _ apitrace.Provider = &Provider{}
// NewProvider creates an instance of trace provider. Optional // NewProvider creates an instance of trace provider. Optional
// parameter configures the provider with common options applicable // parameter configures the provider with common options applicable
// to all tracer instances that will be created by this provider. // to all tracer instances that will be created by this provider.
func NewProvider(opts ...ProviderOption) (*Provider, error) { func NewProvider(opts ...ProviderOption) *Provider {
o := &ProviderOptions{} o := &ProviderOptions{}
for _, opt := range opts { for _, opt := range opts {
@ -74,22 +70,13 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
MaxLinksPerSpan: DefaultMaxLinksPerSpan, MaxLinksPerSpan: DefaultMaxLinksPerSpan,
}) })
for _, syncer := range o.syncers { for _, sp := range o.processors {
ssp := NewSimpleSpanProcessor(syncer) tp.RegisterSpanProcessor(sp)
tp.RegisterSpanProcessor(ssp)
}
for _, batcher := range o.batchers {
bsp, err := NewBatchSpanProcessor(batcher.b, batcher.opts...)
if err != nil {
return nil, err
}
tp.RegisterSpanProcessor(bsp)
} }
tp.ApplyConfig(o.config) tp.ApplyConfig(o.config)
return tp, nil return tp
} }
// Tracer with the given name. If a tracer for the given name does not exist, // Tracer with the given name. If a tracer for the given name does not exist,
@ -176,23 +163,22 @@ func (p *Provider) ApplyConfig(cfg Config) {
p.config.Store(&c) p.config.Store(&c)
} }
// WithSyncer options appends the syncer to the existing list of Syncers. // WithSyncer registers the exporter with the Provider using a
// This option can be used multiple times. // SimpleSpanProcessor.
// The Syncers are wrapped into SimpleSpanProcessors and registered func WithSyncer(e export.SpanExporter) ProviderOption {
// with the provider. return WithSpanProcessor(NewSimpleSpanProcessor(e))
func WithSyncer(syncer export.SpanSyncer) ProviderOption {
return func(opts *ProviderOptions) {
opts.syncers = append(opts.syncers, syncer)
}
} }
// WithBatcher options appends the batcher to the existing list of Batchers. // WithBatcher registers the exporter with the Provider using a
// This option can be used multiple times. // BatchSpanProcessor configured with the passed opts.
// The Batchers are wrapped into BatchedSpanProcessors and registered func WithBatcher(e export.SpanExporter, opts ...BatchSpanProcessorOption) ProviderOption {
// with the provider. return WithSpanProcessor(NewBatchSpanProcessor(e, opts...))
func WithBatcher(b export.SpanBatcher, bopts ...BatchSpanProcessorOption) ProviderOption { }
// WithSpanProcessor registers the SpanProcessor with a Provider.
func WithSpanProcessor(sp SpanProcessor) ProviderOption {
return func(opts *ProviderOptions) { return func(opts *ProviderOptions) {
opts.batchers = append(opts.batchers, batcher{b, bopts}) opts.processors = append(opts.processors, sp)
} }
} }

View File

@ -17,22 +17,23 @@ package trace
import ( import (
"context" "context"
"go.opentelemetry.io/otel/api/global"
export "go.opentelemetry.io/otel/sdk/export/trace" export "go.opentelemetry.io/otel/sdk/export/trace"
) )
// SimpleSpanProcessor implements SpanProcessor interfaces. It is used by // SimpleSpanProcessor is a SpanProcessor that synchronously sends all
// exporters to receive SpanData synchronously when span is finished. // SpanData to a trace.Exporter when the span finishes.
type SimpleSpanProcessor struct { type SimpleSpanProcessor struct {
e export.SpanSyncer e export.SpanExporter
} }
var _ SpanProcessor = (*SimpleSpanProcessor)(nil) var _ SpanProcessor = (*SimpleSpanProcessor)(nil)
// NewSimpleSpanProcessor creates a new instance of SimpleSpanProcessor // NewSimpleSpanProcessor returns a new SimpleSpanProcessor that will
// for a given export. // synchronously send SpanData to the exporter.
func NewSimpleSpanProcessor(e export.SpanSyncer) *SimpleSpanProcessor { func NewSimpleSpanProcessor(exporter export.SpanExporter) *SimpleSpanProcessor {
ssp := &SimpleSpanProcessor{ ssp := &SimpleSpanProcessor{
e: e, e: exporter,
} }
return ssp return ssp
} }
@ -44,7 +45,9 @@ func (ssp *SimpleSpanProcessor) OnStart(sd *export.SpanData) {
// OnEnd method exports SpanData using associated export. // OnEnd method exports SpanData using associated export.
func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) { func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) {
if ssp.e != nil && sd.SpanContext.IsSampled() { if ssp.e != nil && sd.SpanContext.IsSampled() {
ssp.e.ExportSpan(context.Background(), sd) if err := ssp.e.ExportSpans(context.Background(), []*export.SpanData{sd}); err != nil {
global.Handle(err)
}
} }
} }

View File

@ -27,11 +27,14 @@ type testExporter struct {
spans []*export.SpanData spans []*export.SpanData
} }
func (t *testExporter) ExportSpan(ctx context.Context, s *export.SpanData) { func (t *testExporter) ExportSpans(ctx context.Context, spans []*export.SpanData) error {
t.spans = append(t.spans, s) t.spans = append(t.spans, spans...)
return nil
} }
var _ export.SpanSyncer = (*testExporter)(nil) func (t *testExporter) Shutdown(context.Context) error { return nil }
var _ export.SpanExporter = (*testExporter)(nil)
func TestNewSimpleSpanProcessor(t *testing.T) { func TestNewSimpleSpanProcessor(t *testing.T) {
ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{}) ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{})

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"math" "math"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -59,10 +60,7 @@ func init() {
} }
func TestTracerFollowsExpectedAPIBehaviour(t *testing.T) { func TestTracerFollowsExpectedAPIBehaviour(t *testing.T) {
tp, err := NewProvider(WithConfig(Config{DefaultSampler: TraceIDRatioBased(0)})) tp := NewProvider(WithConfig(Config{DefaultSampler: TraceIDRatioBased(0)}))
if err != nil {
t.Fatalf("failed to create provider, err: %v\n", err)
}
harness := apitest.NewHarness(t) harness := apitest.NewHarness(t)
subjectFactory := func() trace.Tracer { subjectFactory := func() trace.Tracer {
return tp.Tracer("") return tp.Tracer("")
@ -72,11 +70,63 @@ func TestTracerFollowsExpectedAPIBehaviour(t *testing.T) {
} }
type testExporter struct { type testExporter struct {
mu sync.RWMutex
idx map[string]int
spans []*export.SpanData spans []*export.SpanData
} }
func (t *testExporter) ExportSpan(ctx context.Context, d *export.SpanData) { func NewTestExporter() *testExporter {
t.spans = append(t.spans, d) return &testExporter{idx: make(map[string]int)}
}
func (te *testExporter) ExportSpans(_ context.Context, spans []*export.SpanData) error {
te.mu.Lock()
defer te.mu.Unlock()
i := len(te.spans)
for _, s := range spans {
te.idx[s.Name] = i
te.spans = append(te.spans, s)
i++
}
return nil
}
func (te *testExporter) Spans() []*export.SpanData {
te.mu.RLock()
defer te.mu.RUnlock()
cp := make([]*export.SpanData, len(te.spans))
copy(cp, te.spans)
return cp
}
func (te *testExporter) GetSpan(name string) (*export.SpanData, bool) {
te.mu.RLock()
defer te.mu.RUnlock()
i, ok := te.idx[name]
if !ok {
return nil, false
}
return te.spans[i], true
}
func (te *testExporter) Len() int {
te.mu.RLock()
defer te.mu.RUnlock()
return len(te.spans)
}
func (te *testExporter) Shutdown(context.Context) error {
te.Reset()
return nil
}
func (te *testExporter) Reset() {
te.mu.Lock()
defer te.mu.Unlock()
te.idx = make(map[string]int)
te.spans = te.spans[:0]
} }
type testSampler struct { type testSampler struct {
@ -101,7 +151,7 @@ func (ts testSampler) Description() string {
func TestSetName(t *testing.T) { func TestSetName(t *testing.T) {
fooSampler := &testSampler{prefix: "foo", t: t} fooSampler := &testSampler{prefix: "foo", t: t}
tp, _ := NewProvider(WithConfig(Config{DefaultSampler: fooSampler})) tp := NewProvider(WithConfig(Config{DefaultSampler: fooSampler}))
type testCase struct { type testCase struct {
name string name string
@ -156,7 +206,7 @@ func TestSetName(t *testing.T) {
} }
func TestRecordingIsOn(t *testing.T) { func TestRecordingIsOn(t *testing.T) {
tp, _ := NewProvider() tp := NewProvider()
_, span := tp.Tracer("Recording on").Start(context.Background(), "StartSpan") _, span := tp.Tracer("Recording on").Start(context.Background(), "StartSpan")
defer span.End() defer span.End()
if span.IsRecording() == false { if span.IsRecording() == false {
@ -209,10 +259,7 @@ func TestSampling(t *testing.T) {
tc := tc tc := tc
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
t.Parallel() t.Parallel()
p, err := NewProvider(WithConfig(Config{DefaultSampler: tc.sampler})) p := NewProvider(WithConfig(Config{DefaultSampler: tc.sampler}))
if err != nil {
t.Fatal("unexpected error:", err)
}
tr := p.Tracer("test") tr := p.Tracer("test")
var sampled int var sampled int
for i := 0; i < total; i++ { for i := 0; i < total; i++ {
@ -250,7 +297,7 @@ func TestSampling(t *testing.T) {
} }
func TestStartSpanWithParent(t *testing.T) { func TestStartSpanWithParent(t *testing.T) {
tp, _ := NewProvider() tp := NewProvider()
tr := tp.Tracer("SpanWithParent") tr := tp.Tracer("SpanWithParent")
ctx := context.Background() ctx := context.Background()
@ -293,8 +340,8 @@ func TestStartSpanWithParent(t *testing.T) {
} }
func TestSetSpanAttributesOnStart(t *testing.T) { func TestSetSpanAttributesOnStart(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
span := startSpan(tp, span := startSpan(tp,
"StartSpanAttribute", "StartSpanAttribute",
apitrace.WithAttributes(label.String("key1", "value1")), apitrace.WithAttributes(label.String("key1", "value1")),
@ -326,8 +373,8 @@ func TestSetSpanAttributesOnStart(t *testing.T) {
} }
func TestSetSpanAttributes(t *testing.T) { func TestSetSpanAttributes(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
span := startSpan(tp, "SpanAttribute") span := startSpan(tp, "SpanAttribute")
span.SetAttributes(label.String("key1", "value1")) span.SetAttributes(label.String("key1", "value1"))
got, err := endSpan(te, span) got, err := endSpan(te, span)
@ -355,9 +402,9 @@ func TestSetSpanAttributes(t *testing.T) {
} }
func TestSetSpanAttributesOverLimit(t *testing.T) { func TestSetSpanAttributesOverLimit(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
cfg := Config{MaxAttributesPerSpan: 2} cfg := Config{MaxAttributesPerSpan: 2}
tp, _ := NewProvider(WithConfig(cfg), WithSyncer(te)) tp := NewProvider(WithConfig(cfg), WithSyncer(te))
span := startSpan(tp, "SpanAttributesOverLimit") span := startSpan(tp, "SpanAttributesOverLimit")
span.SetAttributes( span.SetAttributes(
@ -393,8 +440,8 @@ func TestSetSpanAttributesOverLimit(t *testing.T) {
} }
func TestEvents(t *testing.T) { func TestEvents(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
span := startSpan(tp, "Events") span := startSpan(tp, "Events")
k1v1 := label.String("key1", "value1") k1v1 := label.String("key1", "value1")
@ -438,9 +485,9 @@ func TestEvents(t *testing.T) {
} }
func TestEventsOverLimit(t *testing.T) { func TestEventsOverLimit(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
cfg := Config{MaxEventsPerSpan: 2} cfg := Config{MaxEventsPerSpan: 2}
tp, _ := NewProvider(WithConfig(cfg), WithSyncer(te)) tp := NewProvider(WithConfig(cfg), WithSyncer(te))
span := startSpan(tp, "EventsOverLimit") span := startSpan(tp, "EventsOverLimit")
k1v1 := label.String("key1", "value1") k1v1 := label.String("key1", "value1")
@ -490,8 +537,8 @@ func TestEventsOverLimit(t *testing.T) {
} }
func TestLinks(t *testing.T) { func TestLinks(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
k1v1 := label.String("key1", "value1") k1v1 := label.String("key1", "value1")
k2v2 := label.String("key2", "value2") k2v2 := label.String("key2", "value2")
@ -529,14 +576,14 @@ func TestLinks(t *testing.T) {
} }
func TestLinksOverLimit(t *testing.T) { func TestLinksOverLimit(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
cfg := Config{MaxLinksPerSpan: 2} cfg := Config{MaxLinksPerSpan: 2}
sc1 := apitrace.SpanContext{TraceID: apitrace.ID([16]byte{1, 1}), SpanID: apitrace.SpanID{3}} sc1 := apitrace.SpanContext{TraceID: apitrace.ID([16]byte{1, 1}), SpanID: apitrace.SpanID{3}}
sc2 := apitrace.SpanContext{TraceID: apitrace.ID([16]byte{1, 1}), SpanID: apitrace.SpanID{3}} sc2 := apitrace.SpanContext{TraceID: apitrace.ID([16]byte{1, 1}), SpanID: apitrace.SpanID{3}}
sc3 := apitrace.SpanContext{TraceID: apitrace.ID([16]byte{1, 1}), SpanID: apitrace.SpanID{3}} sc3 := apitrace.SpanContext{TraceID: apitrace.ID([16]byte{1, 1}), SpanID: apitrace.SpanID{3}}
tp, _ := NewProvider(WithConfig(cfg), WithSyncer(te)) tp := NewProvider(WithConfig(cfg), WithSyncer(te))
span := startSpan(tp, "LinksOverLimit", span := startSpan(tp, "LinksOverLimit",
apitrace.WithLinks( apitrace.WithLinks(
@ -576,8 +623,8 @@ func TestLinksOverLimit(t *testing.T) {
} }
func TestSetSpanName(t *testing.T) { func TestSetSpanName(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
ctx := context.Background() ctx := context.Background()
want := "SpanName-1" want := "SpanName-1"
@ -598,8 +645,8 @@ func TestSetSpanName(t *testing.T) {
} }
func TestSetSpanStatus(t *testing.T) { func TestSetSpanStatus(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
span := startSpan(tp, "SpanStatus") span := startSpan(tp, "SpanStatus")
span.SetStatus(otelcodes.Canceled, "canceled") span.SetStatus(otelcodes.Canceled, "canceled")
@ -702,10 +749,10 @@ func endSpan(te *testExporter, span apitrace.Span) (*export.SpanData, error) {
return nil, fmt.Errorf("IsSampled: got false, want true") return nil, fmt.Errorf("IsSampled: got false, want true")
} }
span.End() span.End()
if len(te.spans) != 1 { if te.Len() != 1 {
return nil, fmt.Errorf("got exported spans %#v, want one span", te.spans) return nil, fmt.Errorf("got %d exported spans, want one span", te.Len())
} }
got := te.spans[0] got := te.Spans()[0]
if !got.SpanContext.SpanID.IsValid() { if !got.SpanContext.SpanID.IsValid() {
return nil, fmt.Errorf("exporting span: expected nonzero SpanID") return nil, fmt.Errorf("exporting span: expected nonzero SpanID")
} }
@ -728,27 +775,21 @@ func checkTime(x *time.Time) bool {
return true return true
} }
type fakeExporter map[string]*export.SpanData
func (f fakeExporter) ExportSpan(ctx context.Context, s *export.SpanData) {
f[s.Name] = s
}
func TestEndSpanTwice(t *testing.T) { func TestEndSpanTwice(t *testing.T) {
spans := make(fakeExporter) te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(spans)) tp := NewProvider(WithSyncer(te))
span := startSpan(tp, "EndSpanTwice") span := startSpan(tp, "EndSpanTwice")
span.End() span.End()
span.End() span.End()
if len(spans) != 1 { if te.Len() != 1 {
t.Fatalf("expected only a single span, got %#v", spans) t.Fatalf("expected only a single span, got %#v", te.Spans())
} }
} }
func TestStartSpanAfterEnd(t *testing.T) { func TestStartSpanAfterEnd(t *testing.T) {
spans := make(fakeExporter) te := NewTestExporter()
tp, _ := NewProvider(WithConfig(Config{DefaultSampler: AlwaysSample()}), WithSyncer(spans)) tp := NewProvider(WithConfig(Config{DefaultSampler: AlwaysSample()}), WithSyncer(te))
ctx := context.Background() ctx := context.Background()
tr := tp.Tracer("SpanAfterEnd") tr := tp.Tracer("SpanAfterEnd")
@ -760,19 +801,19 @@ func TestStartSpanAfterEnd(t *testing.T) {
_, span2 := tr.Start(ctx1, "span-2") _, span2 := tr.Start(ctx1, "span-2")
span2.End() span2.End()
span0.End() span0.End()
if got, want := len(spans), 3; got != want { if got, want := te.Len(), 3; got != want {
t.Fatalf("len(%#v) = %d; want %d", spans, got, want) t.Fatalf("len(%#v) = %d; want %d", te.Spans(), got, want)
} }
gotParent, ok := spans["parent"] gotParent, ok := te.GetSpan("parent")
if !ok { if !ok {
t.Fatal("parent not recorded") t.Fatal("parent not recorded")
} }
gotSpan1, ok := spans["span-1"] gotSpan1, ok := te.GetSpan("span-1")
if !ok { if !ok {
t.Fatal("span-1 not recorded") t.Fatal("span-1 not recorded")
} }
gotSpan2, ok := spans["span-2"] gotSpan2, ok := te.GetSpan("span-2")
if !ok { if !ok {
t.Fatal("span-2 not recorded") t.Fatal("span-2 not recorded")
} }
@ -792,8 +833,8 @@ func TestStartSpanAfterEnd(t *testing.T) {
} }
func TestChildSpanCount(t *testing.T) { func TestChildSpanCount(t *testing.T) {
spans := make(fakeExporter) te := NewTestExporter()
tp, _ := NewProvider(WithConfig(Config{DefaultSampler: AlwaysSample()}), WithSyncer(spans)) tp := NewProvider(WithConfig(Config{DefaultSampler: AlwaysSample()}), WithSyncer(te))
tr := tp.Tracer("ChidSpanCount") tr := tp.Tracer("ChidSpanCount")
ctx, span0 := tr.Start(context.Background(), "parent") ctx, span0 := tr.Start(context.Background(), "parent")
@ -805,38 +846,38 @@ func TestChildSpanCount(t *testing.T) {
_, span3 := tr.Start(ctx, "span-3") _, span3 := tr.Start(ctx, "span-3")
span3.End() span3.End()
span0.End() span0.End()
if got, want := len(spans), 4; got != want { if got, want := te.Len(), 4; got != want {
t.Fatalf("len(%#v) = %d; want %d", spans, got, want) t.Fatalf("len(%#v) = %d; want %d", te.Spans(), got, want)
} }
gotParent, ok := spans["parent"] gotParent, ok := te.GetSpan("parent")
if !ok { if !ok {
t.Fatal("parent not recorded") t.Fatal("parent not recorded")
} }
gotSpan1, ok := spans["span-1"] gotSpan1, ok := te.GetSpan("span-1")
if !ok { if !ok {
t.Fatal("span-1 not recorded") t.Fatal("span-1 not recorded")
} }
gotSpan2, ok := spans["span-2"] gotSpan2, ok := te.GetSpan("span-2")
if !ok { if !ok {
t.Fatal("span-2 not recorded") t.Fatal("span-2 not recorded")
} }
gotSpan3, ok := spans["span-3"] gotSpan3, ok := te.GetSpan("span-3")
if !ok { if !ok {
t.Fatal("span-3 not recorded") t.Fatal("span-3 not recorded")
} }
if got, want := gotSpan3.ChildSpanCount, 0; got != want { if got, want := gotSpan3.ChildSpanCount, 0; got != want {
t.Errorf("span-3.ChildSpanCount=%q; want %q", got, want) t.Errorf("span-3.ChildSpanCount=%d; want %d", got, want)
} }
if got, want := gotSpan2.ChildSpanCount, 0; got != want { if got, want := gotSpan2.ChildSpanCount, 0; got != want {
t.Errorf("span-2.ChildSpanCount=%q; want %q", got, want) t.Errorf("span-2.ChildSpanCount=%d; want %d", got, want)
} }
if got, want := gotSpan1.ChildSpanCount, 1; got != want { if got, want := gotSpan1.ChildSpanCount, 1; got != want {
t.Errorf("span-1.ChildSpanCount=%q; want %q", got, want) t.Errorf("span-1.ChildSpanCount=%d; want %d", got, want)
} }
if got, want := gotParent.ChildSpanCount, 2; got != want { if got, want := gotParent.ChildSpanCount, 2; got != want {
t.Errorf("parent.ChildSpanCount=%q; want %q", got, want) t.Errorf("parent.ChildSpanCount=%d; want %d", got, want)
} }
} }
@ -847,7 +888,7 @@ func TestNilSpanEnd(t *testing.T) {
func TestExecutionTracerTaskEnd(t *testing.T) { func TestExecutionTracerTaskEnd(t *testing.T) {
var n uint64 var n uint64
tp, _ := NewProvider(WithConfig(Config{DefaultSampler: NeverSample()})) tp := NewProvider(WithConfig(Config{DefaultSampler: NeverSample()}))
tr := tp.Tracer("Execution Tracer Task End") tr := tp.Tracer("Execution Tracer Task End")
executionTracerTaskEnd := func() { executionTracerTaskEnd := func() {
@ -895,8 +936,8 @@ func TestExecutionTracerTaskEnd(t *testing.T) {
} }
func TestCustomStartEndTime(t *testing.T) { func TestCustomStartEndTime(t *testing.T) {
var te testExporter te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(&te), WithConfig(Config{DefaultSampler: AlwaysSample()})) tp := NewProvider(WithSyncer(te), WithConfig(Config{DefaultSampler: AlwaysSample()}))
startTime := time.Date(2019, time.August, 27, 14, 42, 0, 0, time.UTC) startTime := time.Date(2019, time.August, 27, 14, 42, 0, 0, time.UTC)
endTime := startTime.Add(time.Second * 20) endTime := startTime.Add(time.Second * 20)
@ -907,10 +948,10 @@ func TestCustomStartEndTime(t *testing.T) {
) )
span.End(apitrace.WithTimestamp(endTime)) span.End(apitrace.WithTimestamp(endTime))
if len(te.spans) != 1 { if te.Len() != 1 {
t.Fatalf("got exported spans %#v, want one span", te.spans) t.Fatalf("got %d exported spans, want one span", te.Len())
} }
got := te.spans[0] got := te.Spans()[0]
if got.StartTime != startTime { if got.StartTime != startTime {
t.Errorf("expected start time to be %s, got %s", startTime, got.StartTime) t.Errorf("expected start time to be %s, got %s", startTime, got.StartTime)
} }
@ -938,8 +979,8 @@ func TestRecordError(t *testing.T) {
} }
for _, s := range scenarios { for _, s := range scenarios {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
span := startSpan(tp, "RecordError") span := startSpan(tp, "RecordError")
errTime := time.Now() errTime := time.Now()
@ -980,8 +1021,8 @@ func TestRecordError(t *testing.T) {
} }
func TestRecordErrorWithStatus(t *testing.T) { func TestRecordErrorWithStatus(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
span := startSpan(tp, "RecordErrorWithStatus") span := startSpan(tp, "RecordErrorWithStatus")
testErr := ottest.NewTestError("test error") testErr := ottest.NewTestError("test error")
@ -1026,8 +1067,8 @@ func TestRecordErrorWithStatus(t *testing.T) {
} }
func TestRecordErrorNil(t *testing.T) { func TestRecordErrorNil(t *testing.T) {
te := &testExporter{} te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(te)) tp := NewProvider(WithSyncer(te))
span := startSpan(tp, "RecordErrorNil") span := startSpan(tp, "RecordErrorNil")
span.RecordError(context.Background(), nil) span.RecordError(context.Background(), nil)
@ -1056,12 +1097,12 @@ func TestRecordErrorNil(t *testing.T) {
} }
func TestWithSpanKind(t *testing.T) { func TestWithSpanKind(t *testing.T) {
var te testExporter te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(&te), WithConfig(Config{DefaultSampler: AlwaysSample()})) tp := NewProvider(WithSyncer(te), WithConfig(Config{DefaultSampler: AlwaysSample()}))
tr := tp.Tracer("withSpanKind") tr := tp.Tracer("withSpanKind")
_, span := tr.Start(context.Background(), "WithoutSpanKind") _, span := tr.Start(context.Background(), "WithoutSpanKind")
spanData, err := endSpan(&te, span) spanData, err := endSpan(te, span)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
@ -1079,10 +1120,10 @@ func TestWithSpanKind(t *testing.T) {
} }
for _, sk := range sks { for _, sk := range sks {
te.spans = nil te.Reset()
_, span := tr.Start(context.Background(), fmt.Sprintf("SpanKind-%v", sk), apitrace.WithSpanKind(sk)) _, span := tr.Start(context.Background(), fmt.Sprintf("SpanKind-%v", sk), apitrace.WithSpanKind(sk))
spanData, err := endSpan(&te, span) spanData, err := endSpan(te, span)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
@ -1094,13 +1135,13 @@ func TestWithSpanKind(t *testing.T) {
} }
func TestWithResource(t *testing.T) { func TestWithResource(t *testing.T) {
var te testExporter te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(&te), tp := NewProvider(WithSyncer(te),
WithConfig(Config{DefaultSampler: AlwaysSample()}), WithConfig(Config{DefaultSampler: AlwaysSample()}),
WithResource(resource.New(label.String("rk1", "rv1"), label.Int64("rk2", 5)))) WithResource(resource.New(label.String("rk1", "rv1"), label.Int64("rk2", 5))))
span := startSpan(tp, "WithResource") span := startSpan(tp, "WithResource")
span.SetAttributes(label.String("key1", "value1")) span.SetAttributes(label.String("key1", "value1"))
got, err := endSpan(&te, span) got, err := endSpan(te, span)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
@ -1126,8 +1167,8 @@ func TestWithResource(t *testing.T) {
} }
func TestWithInstrumentationVersion(t *testing.T) { func TestWithInstrumentationVersion(t *testing.T) {
var te testExporter te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(&te)) tp := NewProvider(WithSyncer(te))
ctx := context.Background() ctx := context.Background()
ctx = apitrace.ContextWithRemoteSpanContext(ctx, remoteSpanContext()) ctx = apitrace.ContextWithRemoteSpanContext(ctx, remoteSpanContext())
@ -1135,7 +1176,7 @@ func TestWithInstrumentationVersion(t *testing.T) {
"WithInstrumentationVersion", "WithInstrumentationVersion",
apitrace.WithInstrumentationVersion("v0.1.0"), apitrace.WithInstrumentationVersion("v0.1.0"),
).Start(ctx, "span0", apitrace.WithRecord()) ).Start(ctx, "span0", apitrace.WithRecord())
got, err := endSpan(&te, span) got, err := endSpan(te, span)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
@ -1160,8 +1201,8 @@ func TestWithInstrumentationVersion(t *testing.T) {
} }
func TestSpanCapturesPanic(t *testing.T) { func TestSpanCapturesPanic(t *testing.T) {
var te testExporter te := NewTestExporter()
tp, _ := NewProvider(WithSyncer(&te)) tp := NewProvider(WithSyncer(te))
_, span := tp.Tracer("CatchPanic").Start( _, span := tp.Tracer("CatchPanic").Start(
context.Background(), context.Background(),
"span", "span",
@ -1173,10 +1214,11 @@ func TestSpanCapturesPanic(t *testing.T) {
panic(errors.New("error message")) panic(errors.New("error message"))
} }
require.PanicsWithError(t, "error message", f) require.PanicsWithError(t, "error message", f)
require.Len(t, te.spans, 1) spans := te.Spans()
require.Len(t, te.spans[0].MessageEvents, 1) require.Len(t, spans, 1)
assert.Equal(t, te.spans[0].MessageEvents[0].Name, errorEventName) require.Len(t, spans[0].MessageEvents, 1)
assert.Equal(t, te.spans[0].MessageEvents[0].Attributes, []label.KeyValue{ assert.Equal(t, spans[0].MessageEvents[0].Name, errorEventName)
assert.Equal(t, spans[0].MessageEvents[0].Attributes, []label.KeyValue{
errorTypeKey.String("*errors.errorString"), errorTypeKey.String("*errors.errorString"),
errorMessageKey.String("error message"), errorMessageKey.String("error message"),
}) })

View File

@ -23,9 +23,6 @@ import (
var testConfig = sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()} var testConfig = sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}
func basicProvider(t *testing.T) *sdktrace.Provider { func basicProvider(t *testing.T) *sdktrace.Provider {
tp, err := sdktrace.NewProvider(sdktrace.WithConfig(testConfig)) tp := sdktrace.NewProvider(sdktrace.WithConfig(testConfig))
if err != nil {
t.Fatalf("failed to create provider, err: %v\n", err)
}
return tp return tp
} }