1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-06-02 22:57:27 +02:00

Unify trace and metric exporter helpers (#944)

* Adjust Jaeger and Zipkin exporters helper methods

* Update and add tests, examples, various minor improvements

* Update changelog

* Correct the Zipkin example

- wait for the spans to be exported
- rebuild the example

* Zipkin service name as argument

* Rework Jaeger and Zipkin tests

* Include more detailed Changelog

Co-authored-by: ET <evantorrie@users.noreply.github.com>
Co-authored-by: Liz Fong-Jones <lizf@honeycomb.io>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Matej Gera 2020-07-22 20:57:48 +02:00 committed by GitHub
parent c5d77d234c
commit f31d8ec1d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 452 additions and 163 deletions

View File

@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Unreleased]
### Changed
- Jaeger exporter helpers: added InstallNewPipeline and removed RegisterGlobal option instead. (#944)
- Zipkin exporter helpers: pipeline methods introduced, new exporter method adjusted. (#944)
## [0.9.0] - 2020-07-20
### Added

View File

@ -30,7 +30,7 @@ import (
// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer() func() {
// Create and install Jaeger export pipeline
_, flush, err := jaeger.NewExportPipeline(
flush, err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"),
jaeger.WithProcess(jaeger.Process{
ServiceName: "trace-demo",
@ -39,7 +39,6 @@ func initTracer() func() {
kv.Float64("float", 312.23),
},
}),
jaeger.RegisterAsGlobal(),
jaeger.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
if err != nil {

View File

@ -33,30 +33,20 @@ var logger = log.New(os.Stderr, "zipkin-example", log.Ldate|log.Ltime|log.Llongf
// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer(url string) {
// Create Zipkin Exporter
exporter, err := zipkin.NewExporter(
url,
"zipkin-example",
zipkin.WithLogger(logger),
)
if err != nil {
log.Fatal(err)
}
// Create Zipkin Exporter and install it as a global tracer.
//
// For demoing purposes, always sample. In a production application, you should
// configure this to a trace.ProbabilitySampler set at the desired
// configure the sampler to a trace.ProbabilitySampler set at the desired
// probability.
tp, err := sdktrace.NewProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(exporter,
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10),
),
err := zipkin.InstallNewPipeline(
url,
"zipkin-test",
zipkin.WithLogger(logger),
zipkin.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
if err != nil {
log.Fatal(err)
}
global.SetTraceProvider(tp)
}
func main() {
@ -73,7 +63,9 @@ func main() {
bar(ctx)
<-time.After(6 * time.Millisecond)
span.End()
<-time.After(24 * time.Millisecond)
// Wait for the spans to be exported.
<-time.After(5 * time.Second)
}
func bar(ctx context.Context) {

View File

@ -47,10 +47,6 @@ type options struct {
Config *sdktrace.Config
// RegisterGlobal is set to true if the trace provider of the new pipeline should be
// registered as Global Trace Provider
RegisterGlobal bool
Disabled bool
}
@ -82,14 +78,8 @@ func WithSDK(config *sdktrace.Config) Option {
}
}
// RegisterAsGlobal enables the registration of the trace provider of the new pipeline
// as Global Trace Provider.
func RegisterAsGlobal() Option {
return func(o *options) {
o.RegisterGlobal = true
}
}
// WithDisabled option will cause pipeline methods to use
// a no-op provider
func WithDisabled(disabled bool) Option {
return func(o *options) {
o.Disabled = disabled
@ -177,13 +167,22 @@ func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (apitrace.
if exporter.o.Config != nil {
tp.ApplyConfig(*exporter.o.Config)
}
if exporter.o.RegisterGlobal {
global.SetTraceProvider(tp)
}
return tp, exporter.Flush, nil
}
// InstallNewPipeline instantiates a NewExportPipeline with the
// recommended configuration and registers it globally.
func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (func(), error) {
tp, flushFn, err := NewExportPipeline(endpointOption, opts...)
if err != nil {
return nil, err
}
global.SetTraceProvider(tp)
return flushFn, nil
}
// Process contains the information exported to jaeger about the source
// of the trace data.
type Process struct {

View File

@ -26,10 +26,12 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/api/support/bundler"
"google.golang.org/grpc/codes"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/trace"
apitrace "go.opentelemetry.io/otel/api/trace"
gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger"
ottest "go.opentelemetry.io/otel/internal/testing"
@ -38,77 +40,251 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func TestNewExporterPipelineWithRegistration(t *testing.T) {
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
RegisterAsGlobal(),
)
defer fn()
assert.NoError(t, err)
assert.Same(t, tp, global.TraceProvider())
const (
collectorEndpoint = "http://localhost:14268/api/traces"
agentEndpoint = "localhost:6831"
)
func TestInstallNewPipeline(t *testing.T) {
testCases := []struct {
name string
endpoint EndpointOption
options []Option
expectedProvider trace.Provider
}{
{
name: "simple pipeline",
endpoint: WithCollectorEndpoint(collectorEndpoint),
expectedProvider: &sdktrace.Provider{},
},
{
name: "with agent endpoint",
endpoint: WithAgentEndpoint(agentEndpoint),
expectedProvider: &sdktrace.Provider{},
},
{
name: "with disabled",
endpoint: WithCollectorEndpoint(collectorEndpoint),
options: []Option{
WithDisabled(true),
},
expectedProvider: &apitrace.NoopProvider{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn, err := InstallNewPipeline(
tc.endpoint,
tc.options...,
)
defer fn()
assert.NoError(t, err)
assert.IsType(t, tc.expectedProvider, global.TraceProvider())
global.SetTraceProvider(nil)
})
}
}
func TestNewExporterPipelineWithoutRegistration(t *testing.T) {
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
)
defer fn()
assert.NoError(t, err)
assert.NotEqual(t, tp, global.TraceProvider())
func TestNewExportPipeline(t *testing.T) {
testCases := []struct {
name string
endpoint EndpointOption
options []Option
expectedProviderType trace.Provider
testSpanSampling, spanShouldBeSampled bool
}{
{
name: "simple pipeline",
endpoint: WithCollectorEndpoint(collectorEndpoint),
expectedProviderType: &sdktrace.Provider{},
},
{
name: "with disabled",
endpoint: WithCollectorEndpoint(collectorEndpoint),
options: []Option{
WithDisabled(true),
},
expectedProviderType: &apitrace.NoopProvider{},
},
{
name: "always on",
endpoint: WithCollectorEndpoint(collectorEndpoint),
options: []Option{
WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
}),
},
expectedProviderType: &sdktrace.Provider{},
testSpanSampling: true,
spanShouldBeSampled: true,
},
{
name: "never",
endpoint: WithCollectorEndpoint(collectorEndpoint),
options: []Option{
WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.NeverSample(),
}),
},
expectedProviderType: &sdktrace.Provider{},
testSpanSampling: true,
spanShouldBeSampled: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tp, fn, err := NewExportPipeline(
tc.endpoint,
tc.options...,
)
defer fn()
assert.NoError(t, err)
assert.NotEqual(t, tp, global.TraceProvider())
assert.IsType(t, tc.expectedProviderType, tp)
if tc.testSpanSampling {
_, span := tp.Tracer("jaeger test").Start(context.Background(), tc.name)
spanCtx := span.SpanContext()
assert.Equal(t, tc.spanShouldBeSampled, spanCtx.IsSampled())
span.End()
}
})
}
}
func TestNewExporterPipelineWithSDK(t *testing.T) {
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
}),
)
defer fn()
assert.NoError(t, err)
_, span := tp.Tracer("jaeger test").Start(context.Background(), "always-on")
spanCtx := span.SpanContext()
assert.True(t, spanCtx.IsSampled())
span.End()
func TestNewExportPipelineWithDisabledFromEnv(t *testing.T) {
envStore, err := ottest.SetEnvVariables(map[string]string{
envDisabled: "true",
})
require.NoError(t, err)
envStore.Record(envDisabled)
defer func() {
require.NoError(t, envStore.Restore())
}()
tp2, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.NeverSample(),
}),
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint(collectorEndpoint),
)
defer fn()
assert.NoError(t, err)
_, span2 := tp2.Tracer("jaeger test").Start(context.Background(), "never")
span2Ctx := span2.SpanContext()
assert.False(t, span2Ctx.IsSampled())
span2.End()
assert.IsType(t, &apitrace.NoopProvider{}, tp)
}
func TestNewRawExporter(t *testing.T) {
const (
collectorEndpoint = "http://localhost"
serviceName = "test-service"
tagKey = "key"
tagVal = "val"
)
// Create Jaeger Exporter
exp, err := NewRawExporter(
WithCollectorEndpoint(collectorEndpoint),
WithProcess(Process{
ServiceName: serviceName,
Tags: []kv.KeyValue{
kv.String(tagKey, tagVal),
testCases := []struct {
name string
endpoint EndpointOption
options []Option
expectedServiceName string
expectedTagsLen, expectedBufferMaxCount, expectedBatchMaxCount int
}{
{
name: "default exporter",
endpoint: WithCollectorEndpoint(collectorEndpoint),
expectedServiceName: defaultServiceName,
expectedBufferMaxCount: bundler.DefaultBufferedByteLimit,
expectedBatchMaxCount: bundler.DefaultBundleCountThreshold,
},
{
name: "default exporter with agent endpoint",
endpoint: WithAgentEndpoint(agentEndpoint),
expectedServiceName: defaultServiceName,
expectedBufferMaxCount: bundler.DefaultBufferedByteLimit,
expectedBatchMaxCount: bundler.DefaultBundleCountThreshold,
},
{
name: "with process",
endpoint: WithCollectorEndpoint(collectorEndpoint),
options: []Option{
WithProcess(
Process{
ServiceName: "jaeger-test",
Tags: []kv.KeyValue{
kv.String("key", "val"),
},
},
),
},
}),
)
expectedServiceName: "jaeger-test",
expectedTagsLen: 1,
expectedBufferMaxCount: bundler.DefaultBufferedByteLimit,
expectedBatchMaxCount: bundler.DefaultBundleCountThreshold,
},
{
name: "with buffer and batch max count",
endpoint: WithCollectorEndpoint(collectorEndpoint),
options: []Option{
WithProcess(
Process{
ServiceName: "jaeger-test",
},
),
WithBufferMaxCount(99),
WithBatchMaxCount(99),
},
expectedServiceName: "jaeger-test",
expectedBufferMaxCount: 99,
expectedBatchMaxCount: 99,
},
}
assert.NoError(t, err)
assert.EqualValues(t, serviceName, exp.process.ServiceName)
assert.Len(t, exp.process.Tags, 1)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
exp, err := NewRawExporter(
tc.endpoint,
tc.options...,
)
assert.NoError(t, err)
assert.Equal(t, tc.expectedServiceName, exp.process.ServiceName)
assert.Len(t, exp.process.Tags, tc.expectedTagsLen)
assert.Equal(t, tc.expectedBufferMaxCount, exp.bundler.BufferedByteLimit)
assert.Equal(t, tc.expectedBatchMaxCount, exp.bundler.BundleCountThreshold)
})
}
}
func TestNewRawExporterShouldFailIfCollectorEndpointEmpty(t *testing.T) {
func TestNewRawExporterShouldFail(t *testing.T) {
testCases := []struct {
name string
endpoint EndpointOption
expectedErrMsg string
}{
{
name: "with empty collector endpoint",
endpoint: WithCollectorEndpoint(""),
expectedErrMsg: "collectorEndpoint must not be empty",
},
{
name: "with empty agent endpoint",
endpoint: WithAgentEndpoint(""),
expectedErrMsg: "agentEndpoint must not be empty",
},
{
name: "with invalid agent endpoint",
endpoint: WithAgentEndpoint("localhost"),
expectedErrMsg: "address localhost: missing port in address",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, err := NewRawExporter(
tc.endpoint,
)
assert.Error(t, err)
assert.EqualError(t, err, tc.expectedErrMsg)
})
}
}
func TestNewRawExporterShouldFailIfCollectorUnset(t *testing.T) {
// Record and restore env
envStore := ottest.NewEnvStore()
envStore.Record(envEndpoint)
@ -179,29 +355,6 @@ func TestExporter_ExportSpan(t *testing.T) {
assert.True(t, len(tc.spansUploaded) == 1)
}
func TestNewRawExporterWithAgentEndpoint(t *testing.T) {
const agentEndpoint = "localhost:6831"
// Create Jaeger Exporter
_, err := NewRawExporter(
WithAgentEndpoint(agentEndpoint),
)
assert.NoError(t, err)
}
func TestNewRawExporterWithAgentShouldFailIfEndpointInvalid(t *testing.T) {
//empty
_, err := NewRawExporter(
WithAgentEndpoint(""),
)
assert.Error(t, err)
//invalid endpoint addr
_, err = NewRawExporter(
WithAgentEndpoint("http://localhost"),
)
assert.Error(t, err)
}
func Test_spanDataToThrift(t *testing.T) {
now := time.Now()
traceID, _ := apitrace.IDFromHex("0102030405060708090a0b0c0d0e0f10")
@ -319,31 +472,3 @@ func Test_spanDataToThrift(t *testing.T) {
})
}
}
func TestNewExporterPipelineWithDisabled(t *testing.T) {
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
WithDisabled(true),
)
defer fn()
assert.NoError(t, err)
assert.IsType(t, &apitrace.NoopProvider{}, tp)
}
func TestNewExporterPipelineWithDisabledFromEnv(t *testing.T) {
envStore, err := ottest.SetEnvVariables(map[string]string{
envDisabled: "true",
})
require.NoError(t, err)
envStore.Record(envDisabled)
defer func() {
require.NoError(t, envStore.Restore())
}()
tp, fn, err := NewExportPipeline(
WithCollectorEndpoint("http://localhost:14268/api/traces"),
)
defer fn()
assert.NoError(t, err)
assert.IsType(t, &apitrace.NoopProvider{}, tp)
}

View File

@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package zipkin contains an OpenTelemetry tracing exporter for Zipkin.
package zipkin // import "go.opentelemetry.io/otel/exporters/trace/zipkin"

View File

@ -18,13 +18,16 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"go.opentelemetry.io/otel/api/global"
export "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// Exporter exports SpanData to the zipkin collector. It implements
@ -35,6 +38,7 @@ type Exporter struct {
serviceName string
client *http.Client
logger *log.Logger
o options
}
var (
@ -42,51 +46,97 @@ var (
)
// Options contains configuration for the exporter.
type Options struct {
type options struct {
client *http.Client
logger *log.Logger
config *sdktrace.Config
}
// Option defines a function that configures the exporter.
type Option func(*Options)
type Option func(*options)
// WithLogger configures the exporter to use the passed logger.
func WithLogger(logger *log.Logger) Option {
return func(opts *Options) {
return func(opts *options) {
opts.logger = logger
}
}
// WithClient configures the exporter to use the passed HTTP client.
func WithClient(client *http.Client) Option {
return func(opts *Options) {
return func(opts *options) {
opts.client = client
}
}
// NewExporter creates a new zipkin exporter.
func NewExporter(collectorURL string, serviceName string, os ...Option) (*Exporter, error) {
if _, err := url.Parse(collectorURL); err != nil {
// WithSDK sets the SDK config for the exporter pipeline.
func WithSDK(config *sdktrace.Config) Option {
return func(o *options) {
o.config = config
}
}
// NewRawExporter creates a new Zipkin exporter.
func NewRawExporter(collectorURL, serviceName string, opts ...Option) (*Exporter, error) {
if collectorURL == "" {
return nil, errors.New("collector URL cannot be empty")
}
u, err := url.Parse(collectorURL)
if err != nil {
return nil, fmt.Errorf("invalid collector URL: %v", err)
}
if serviceName == "" {
return nil, fmt.Errorf("service name must be non-empty string")
if u.Scheme == "" || u.Host == "" {
return nil, errors.New("invalid collector URL")
}
opts := Options{}
for _, o := range os {
o(&opts)
o := options{}
for _, opt := range opts {
opt(&o)
}
if opts.client == nil {
opts.client = http.DefaultClient
if o.client == nil {
o.client = http.DefaultClient
}
return &Exporter{
url: collectorURL,
client: opts.client,
logger: opts.logger,
client: o.client,
logger: o.logger,
serviceName: serviceName,
o: o,
}, nil
}
// NewExportPipeline sets up a complete export pipeline
// with the recommended setup for trace provider
func NewExportPipeline(collectorURL, serviceName string, opts ...Option) (*sdktrace.Provider, error) {
exp, err := NewRawExporter(collectorURL, serviceName, opts...)
if err != nil {
return nil, err
}
batcher := sdktrace.WithBatcher(exp)
tp, err := sdktrace.NewProvider(batcher)
if err != nil {
return nil, err
}
if exp.o.config != nil {
tp.ApplyConfig(*exp.o.config)
}
return tp, err
}
// InstallNewPipeline instantiates a NewExportPipeline with the
// recommended configuration and registers it globally.
func InstallNewPipeline(collectorURL, serviceName string, opts ...Option) error {
tp, err := NewExportPipeline(collectorURL, serviceName, opts...)
if err != nil {
return err
}
global.SetTraceProvider(tp)
return nil
}
// ExportSpans is a part of an implementation of the SpanBatcher
// interface.
func (e *Exporter) ExportSpans(ctx context.Context, batch []*export.SpanData) {

View File

@ -27,13 +27,118 @@ import (
"time"
zkmodel "github.com/openzipkin/zipkin-go/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
const (
collectorURL = "http://localhost:9411/api/v2/spans"
serviceName = "zipkin-test"
)
func TestInstallNewPipeline(t *testing.T) {
err := InstallNewPipeline(
collectorURL,
serviceName,
)
assert.NoError(t, err)
assert.IsType(t, &sdktrace.Provider{}, global.TraceProvider())
}
func TestNewExportPipeline(t *testing.T) {
testCases := []struct {
name string
options []Option
testSpanSampling, spanShouldBeSampled bool
}{
{
name: "simple pipeline",
},
{
name: "always on",
options: []Option{
WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
}),
},
testSpanSampling: true,
spanShouldBeSampled: true,
},
{
name: "never",
options: []Option{
WithSDK(&sdktrace.Config{
DefaultSampler: sdktrace.NeverSample(),
}),
},
testSpanSampling: true,
spanShouldBeSampled: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tp, err := NewExportPipeline(
collectorURL,
serviceName,
tc.options...,
)
assert.NoError(t, err)
assert.NotEqual(t, tp, global.TraceProvider())
if tc.testSpanSampling {
_, span := tp.Tracer("zipkin test").Start(context.Background(), tc.name)
spanCtx := span.SpanContext()
assert.Equal(t, tc.spanShouldBeSampled, spanCtx.IsSampled())
span.End()
}
})
}
}
func TestNewRawExporter(t *testing.T) {
exp, err := NewRawExporter(
collectorURL,
serviceName,
)
assert.NoError(t, err)
assert.EqualValues(t, serviceName, exp.serviceName)
}
func TestNewRawExporterShouldFailInvalidCollectorURL(t *testing.T) {
var (
exp *Exporter
err error
)
// cannot be empty
exp, err = NewRawExporter(
"",
serviceName,
)
assert.Error(t, err)
assert.EqualError(t, err, "collector URL cannot be empty")
assert.Nil(t, exp)
// invalid URL
exp, err = NewRawExporter(
"localhost",
serviceName,
)
assert.Error(t, err)
assert.EqualError(t, err, "invalid collector URL")
assert.Nil(t, exp)
}
type mockZipkinCollector struct {
t *testing.T
url string
@ -230,9 +335,7 @@ func TestExportSpans(t *testing.T) {
defer collector.Close()
ls := &logStore{T: t}
logger := logStoreLogger(ls)
_, err := NewExporter(collector.url, "", WithLogger(logger))
require.Error(t, err, "service name must be non-empty string")
exporter, err := NewExporter(collector.url, "exporter-test", WithLogger(logger))
exporter, err := NewRawExporter(collector.url, "exporter-test", WithLogger(logger))
require.NoError(t, err)
ctx := context.Background()
require.Len(t, ls.Messages, 0)

View File

@ -29,7 +29,7 @@ type EnvStore interface {
// Records the environment variable into the store.
Record(key string)
// Restore recover the environment variables in the store.
// Restore recovers the environment variables in the store.
Restore() error
}