mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-05-13 21:56:48 +02:00
Add a split protocol driver for otlp exporter (#1418)
* Add a split protocol driver This is a wrapper around two other protocol drivers, so it makes it possible to send traces using a different protocol than the one used for metrics. * Add an example and tests for multi GRPC endpoint driver * Update changelog * Document the split driver
This commit is contained in:
parent
439cd31389
commit
38e76efe99
@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||||||
|
|
||||||
- Add the `ReadOnlySpan` and `ReadWriteSpan` interfaces to provide better control for accessing span data. (#1360)
|
- Add the `ReadOnlySpan` and `ReadWriteSpan` interfaces to provide better control for accessing span data. (#1360)
|
||||||
- `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369)
|
- `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369)
|
||||||
|
- `NewSplitDriver` for OTLP exporter that allows sending traces and metrics to different endpoints. (#1418)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
|
@ -24,6 +24,10 @@ import (
|
|||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/exporters/otlp"
|
"go.opentelemetry.io/otel/exporters/otlp"
|
||||||
|
"go.opentelemetry.io/otel/metric"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -51,6 +55,13 @@ func Example_insecure() {
|
|||||||
sdktrace.WithMaxExportBatchSize(10),
|
sdktrace.WithMaxExportBatchSize(10),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
defer func() {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := tp.Shutdown(ctx); err != nil {
|
||||||
|
otel.Handle(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
otel.SetTracerProvider(tp)
|
otel.SetTracerProvider(tp)
|
||||||
|
|
||||||
tracer := otel.Tracer("test-tracer")
|
tracer := otel.Tracer("test-tracer")
|
||||||
@ -97,6 +108,13 @@ func Example_withTLS() {
|
|||||||
sdktrace.WithMaxExportBatchSize(10),
|
sdktrace.WithMaxExportBatchSize(10),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
defer func() {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := tp.Shutdown(ctx); err != nil {
|
||||||
|
otel.Handle(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
otel.SetTracerProvider(tp)
|
otel.SetTracerProvider(tp)
|
||||||
|
|
||||||
tracer := otel.Tracer("test-tracer")
|
tracer := otel.Tracer("test-tracer")
|
||||||
@ -111,3 +129,91 @@ func Example_withTLS() {
|
|||||||
iSpan.End()
|
iSpan.End()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Example_withDifferentSignalCollectors() {
|
||||||
|
|
||||||
|
// Set different endpoints for the metrics and traces collectors
|
||||||
|
metricsDriver := otlp.NewGRPCDriver(
|
||||||
|
otlp.WithInsecure(),
|
||||||
|
otlp.WithAddress("localhost:30080"),
|
||||||
|
)
|
||||||
|
tracesDriver := otlp.NewGRPCDriver(
|
||||||
|
otlp.WithInsecure(),
|
||||||
|
otlp.WithAddress("localhost:30082"),
|
||||||
|
)
|
||||||
|
splitCfg := otlp.SplitConfig{
|
||||||
|
ForMetrics: metricsDriver,
|
||||||
|
ForTraces: tracesDriver,
|
||||||
|
}
|
||||||
|
driver := otlp.NewSplitDriver(splitCfg)
|
||||||
|
ctx := context.Background()
|
||||||
|
exp, err := otlp.NewExporter(ctx, driver)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed to create the collector exporter: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := exp.Shutdown(ctx); err != nil {
|
||||||
|
otel.Handle(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
tp := sdktrace.NewTracerProvider(
|
||||||
|
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
|
||||||
|
sdktrace.WithBatcher(
|
||||||
|
exp,
|
||||||
|
// add following two options to ensure flush
|
||||||
|
sdktrace.WithBatchTimeout(5),
|
||||||
|
sdktrace.WithMaxExportBatchSize(10),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := tp.Shutdown(ctx); err != nil {
|
||||||
|
otel.Handle(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
otel.SetTracerProvider(tp)
|
||||||
|
|
||||||
|
pusher := push.New(
|
||||||
|
basic.New(
|
||||||
|
simple.NewWithExactDistribution(),
|
||||||
|
exp,
|
||||||
|
),
|
||||||
|
exp,
|
||||||
|
push.WithPeriod(2*time.Second),
|
||||||
|
)
|
||||||
|
otel.SetMeterProvider(pusher.MeterProvider())
|
||||||
|
|
||||||
|
pusher.Start()
|
||||||
|
defer pusher.Stop() // pushes any last exports to the receiver
|
||||||
|
|
||||||
|
tracer := otel.Tracer("test-tracer")
|
||||||
|
meter := otel.Meter("test-meter")
|
||||||
|
|
||||||
|
// Recorder metric example
|
||||||
|
valuerecorder := metric.Must(meter).
|
||||||
|
NewFloat64Counter(
|
||||||
|
"an_important_metric",
|
||||||
|
metric.WithDescription("Measures the cumulative epicness of the app"),
|
||||||
|
)
|
||||||
|
|
||||||
|
// work begins
|
||||||
|
ctx, span := tracer.Start(
|
||||||
|
ctx,
|
||||||
|
"DifferentCollectors-Example")
|
||||||
|
defer span.End()
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
_, iSpan := tracer.Start(ctx, fmt.Sprintf("Sample-%d", i))
|
||||||
|
log.Printf("Doing really hard work (%d / 10)\n", i+1)
|
||||||
|
valuerecorder.Add(ctx, 1.0)
|
||||||
|
|
||||||
|
<-time.After(time.Second)
|
||||||
|
iSpan.End()
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Done!")
|
||||||
|
}
|
||||||
|
@ -97,25 +97,7 @@ func newGRPCExporter(t *testing.T, ctx context.Context, address string, addition
|
|||||||
return exp
|
return exp
|
||||||
}
|
}
|
||||||
|
|
||||||
func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
|
func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCol) {
|
||||||
mc := runMockColAtAddr(t, "localhost:56561")
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
_ = mc.stop()
|
|
||||||
}()
|
|
||||||
|
|
||||||
<-time.After(5 * time.Millisecond)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...)
|
|
||||||
defer func() {
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
if err := exp.Shutdown(ctx); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
pOpts := []sdktrace.TracerProviderOption{
|
pOpts := []sdktrace.TracerProviderOption{
|
||||||
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
|
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
|
||||||
sdktrace.WithBatcher(
|
sdktrace.WithBatcher(
|
||||||
@ -239,10 +221,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
|
|||||||
|
|
||||||
// Shutdown the collector too so that we can begin
|
// Shutdown the collector too so that we can begin
|
||||||
// verification checks of expected data back.
|
// verification checks of expected data back.
|
||||||
_ = mc.stop()
|
_ = mcTraces.stop()
|
||||||
|
_ = mcMetrics.stop()
|
||||||
|
|
||||||
// Now verify that we only got two resources
|
// Now verify that we only got two resources
|
||||||
rss := mc.getResourceSpans()
|
rss := mcTraces.getResourceSpans()
|
||||||
if got, want := len(rss), 2; got != want {
|
if got, want := len(rss), 2; got != want {
|
||||||
t.Fatalf("resource span count: got %d, want %d\n", got, want)
|
t.Fatalf("resource span count: got %d, want %d\n", got, want)
|
||||||
}
|
}
|
||||||
@ -273,7 +256,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics := mc.getMetrics()
|
metrics := mcMetrics.getMetrics()
|
||||||
assert.Len(t, metrics, len(instruments), "not enough metrics exported")
|
assert.Len(t, metrics, len(instruments), "not enough metrics exported")
|
||||||
seen := make(map[string]struct{}, len(instruments))
|
seen := make(map[string]struct{}, len(instruments))
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
@ -342,6 +325,28 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
|
||||||
|
mc := runMockColAtAddr(t, "localhost:56561")
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
_ = mc.stop()
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-time.After(5 * time.Millisecond)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...)
|
||||||
|
defer func() {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := exp.Shutdown(ctx); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
runEndToEndTest(t, ctx, exp, mc, mc)
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
|
func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
|
||||||
mc := runMockCol(t)
|
mc := runMockCol(t)
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -761,3 +766,44 @@ func TestFailedMetricTransform(t *testing.T) {
|
|||||||
|
|
||||||
assert.Error(t, exp.Export(ctx, failCheckpointSet{}))
|
assert.Error(t, exp.Export(ctx, failCheckpointSet{}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultiConnectionDriver(t *testing.T) {
|
||||||
|
mcTraces := runMockCol(t)
|
||||||
|
mcMetrics := runMockCol(t)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
_ = mcTraces.stop()
|
||||||
|
_ = mcMetrics.stop()
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-time.After(5 * time.Millisecond)
|
||||||
|
|
||||||
|
commonOpts := []otlp.GRPCConnectionOption{
|
||||||
|
otlp.WithInsecure(),
|
||||||
|
otlp.WithReconnectionPeriod(50 * time.Millisecond),
|
||||||
|
otlp.WithGRPCDialOption(grpc.WithBlock()),
|
||||||
|
}
|
||||||
|
optsTraces := append([]otlp.GRPCConnectionOption{
|
||||||
|
otlp.WithAddress(mcTraces.address),
|
||||||
|
}, commonOpts...)
|
||||||
|
optsMetrics := append([]otlp.GRPCConnectionOption{
|
||||||
|
otlp.WithAddress(mcMetrics.address),
|
||||||
|
}, commonOpts...)
|
||||||
|
|
||||||
|
tracesDriver := otlp.NewGRPCDriver(optsTraces...)
|
||||||
|
metricsDriver := otlp.NewGRPCDriver(optsMetrics...)
|
||||||
|
splitCfg := otlp.SplitConfig{
|
||||||
|
ForMetrics: metricsDriver,
|
||||||
|
ForTraces: tracesDriver,
|
||||||
|
}
|
||||||
|
driver := otlp.NewSplitDriver(splitCfg)
|
||||||
|
ctx := context.Background()
|
||||||
|
exp, err := otlp.NewExporter(ctx, driver)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create a new collector exporter: %v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
assert.NoError(t, exp.Shutdown(ctx))
|
||||||
|
}()
|
||||||
|
runEndToEndTest(t, ctx, exp, mcTraces, mcMetrics)
|
||||||
|
}
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package otlp
|
package otlp_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/exporters/otlp"
|
||||||
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
|
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
|
||||||
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
|
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
|
||||||
resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1"
|
resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1"
|
||||||
@ -692,8 +693,8 @@ func TestStatelessExportKind(t *testing.T) {
|
|||||||
t.Run(k.name, func(t *testing.T) {
|
t.Run(k.name, func(t *testing.T) {
|
||||||
runMetricExportTests(
|
runMetricExportTests(
|
||||||
t,
|
t,
|
||||||
[]ExporterOption{
|
[]otlp.ExporterOption{
|
||||||
WithMetricExportKindSelector(
|
otlp.WithMetricExportKindSelector(
|
||||||
metricsdk.StatelessExportKindSelector(),
|
metricsdk.StatelessExportKindSelector(),
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
@ -740,7 +741,7 @@ func TestStatelessExportKind(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
|
func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
|
||||||
exp, driver := newExporter(t, opts...)
|
exp, driver := newExporter(t, opts...)
|
||||||
|
|
||||||
recs := map[label.Distinct][]metricsdk.Record{}
|
recs := map[label.Distinct][]metricsdk.Record{}
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package otlp
|
package otlp_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package otlp
|
package otlp_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -21,8 +21,10 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/exporters/otlp"
|
||||||
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
|
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
|
||||||
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
|
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
|
||||||
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
|
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
|
||||||
@ -32,31 +34,42 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type stubProtocolDriver struct {
|
type stubProtocolDriver struct {
|
||||||
|
started int
|
||||||
|
stopped int
|
||||||
|
tracesExported int
|
||||||
|
metricsExported int
|
||||||
|
|
||||||
|
injectedStartError error
|
||||||
|
injectedStopError error
|
||||||
|
|
||||||
rm []metricpb.ResourceMetrics
|
rm []metricpb.ResourceMetrics
|
||||||
rs []tracepb.ResourceSpans
|
rs []tracepb.ResourceSpans
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ ProtocolDriver = (*stubProtocolDriver)(nil)
|
var _ otlp.ProtocolDriver = (*stubProtocolDriver)(nil)
|
||||||
|
|
||||||
func (m *stubProtocolDriver) Start(ctx context.Context) error {
|
func (m *stubProtocolDriver) Start(ctx context.Context) error {
|
||||||
|
m.started++
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
default:
|
default:
|
||||||
return nil
|
return m.injectedStartError
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *stubProtocolDriver) Stop(ctx context.Context) error {
|
func (m *stubProtocolDriver) Stop(ctx context.Context) error {
|
||||||
|
m.stopped++
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
default:
|
default:
|
||||||
return nil
|
return m.injectedStopError
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
|
func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
|
||||||
|
m.metricsExported++
|
||||||
rms, err := transform.CheckpointSet(parent, selector, cps, 1)
|
rms, err := transform.CheckpointSet(parent, selector, cps, 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -71,6 +84,7 @@ func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
|
func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
|
||||||
|
m.tracesExported++
|
||||||
for _, rs := range transform.SpanData(ss) {
|
for _, rs := range transform.SpanData(ss) {
|
||||||
if rs == nil {
|
if rs == nil {
|
||||||
continue
|
continue
|
||||||
@ -85,9 +99,9 @@ func (m *stubProtocolDriver) Reset() {
|
|||||||
m.rs = nil
|
m.rs = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newExporter(t *testing.T, opts ...ExporterOption) (*Exporter, *stubProtocolDriver) {
|
func newExporter(t *testing.T, opts ...otlp.ExporterOption) (*otlp.Exporter, *stubProtocolDriver) {
|
||||||
driver := &stubProtocolDriver{}
|
driver := &stubProtocolDriver{}
|
||||||
exp, err := NewExporter(context.Background(), driver, opts...)
|
exp, err := otlp.NewExporter(context.Background(), driver, opts...)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return exp, driver
|
return exp, driver
|
||||||
}
|
}
|
||||||
@ -96,7 +110,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
e := NewUnstartedExporter(&stubProtocolDriver{})
|
e := otlp.NewUnstartedExporter(&stubProtocolDriver{})
|
||||||
if err := e.Start(ctx); err != nil {
|
if err := e.Start(ctx); err != nil {
|
||||||
t.Fatalf("failed to start exporter: %v", err)
|
t.Fatalf("failed to start exporter: %v", err)
|
||||||
}
|
}
|
||||||
@ -115,7 +129,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
e := NewUnstartedExporter(&stubProtocolDriver{})
|
e := otlp.NewUnstartedExporter(&stubProtocolDriver{})
|
||||||
if err := e.Start(ctx); err != nil {
|
if err := e.Start(ctx); err != nil {
|
||||||
t.Fatalf("failed to start exporter: %v", err)
|
t.Fatalf("failed to start exporter: %v", err)
|
||||||
}
|
}
|
||||||
@ -134,7 +148,7 @@ func TestExporterShutdownNoError(t *testing.T) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
e := NewUnstartedExporter(&stubProtocolDriver{})
|
e := otlp.NewUnstartedExporter(&stubProtocolDriver{})
|
||||||
if err := e.Start(ctx); err != nil {
|
if err := e.Start(ctx); err != nil {
|
||||||
t.Fatalf("failed to start exporter: %v", err)
|
t.Fatalf("failed to start exporter: %v", err)
|
||||||
}
|
}
|
||||||
@ -146,7 +160,7 @@ func TestExporterShutdownNoError(t *testing.T) {
|
|||||||
|
|
||||||
func TestExporterShutdownManyTimes(t *testing.T) {
|
func TestExporterShutdownManyTimes(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
e, err := NewExporter(ctx, &stubProtocolDriver{})
|
e, err := otlp.NewExporter(ctx, &stubProtocolDriver{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to start an exporter: %v", err)
|
t.Fatalf("failed to start an exporter: %v", err)
|
||||||
}
|
}
|
||||||
@ -170,3 +184,96 @@ func TestExporterShutdownManyTimes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSplitDriver(t *testing.T) {
|
||||||
|
driverTraces := &stubProtocolDriver{}
|
||||||
|
driverMetrics := &stubProtocolDriver{}
|
||||||
|
config := otlp.SplitConfig{
|
||||||
|
ForMetrics: driverMetrics,
|
||||||
|
ForTraces: driverTraces,
|
||||||
|
}
|
||||||
|
driver := otlp.NewSplitDriver(config)
|
||||||
|
ctx := context.Background()
|
||||||
|
assert.NoError(t, driver.Start(ctx))
|
||||||
|
assert.Equal(t, 1, driverTraces.started)
|
||||||
|
assert.Equal(t, 1, driverMetrics.started)
|
||||||
|
assert.Equal(t, 0, driverTraces.stopped)
|
||||||
|
assert.Equal(t, 0, driverMetrics.stopped)
|
||||||
|
assert.Equal(t, 0, driverTraces.tracesExported)
|
||||||
|
assert.Equal(t, 0, driverTraces.metricsExported)
|
||||||
|
assert.Equal(t, 0, driverMetrics.tracesExported)
|
||||||
|
assert.Equal(t, 0, driverMetrics.metricsExported)
|
||||||
|
|
||||||
|
assert.NoError(t, driver.ExportMetrics(ctx, discCheckpointSet{}, metricsdk.StatelessExportKindSelector()))
|
||||||
|
assert.NoError(t, driver.ExportTraces(ctx, []*tracesdk.SpanSnapshot{discSpanSnapshot()}))
|
||||||
|
assert.Len(t, driverTraces.rm, 0)
|
||||||
|
assert.Len(t, driverTraces.rs, 1)
|
||||||
|
assert.Len(t, driverMetrics.rm, 1)
|
||||||
|
assert.Len(t, driverMetrics.rs, 0)
|
||||||
|
assert.Equal(t, 1, driverTraces.tracesExported)
|
||||||
|
assert.Equal(t, 0, driverTraces.metricsExported)
|
||||||
|
assert.Equal(t, 0, driverMetrics.tracesExported)
|
||||||
|
assert.Equal(t, 1, driverMetrics.metricsExported)
|
||||||
|
|
||||||
|
assert.NoError(t, driver.Stop(ctx))
|
||||||
|
assert.Equal(t, 1, driverTraces.started)
|
||||||
|
assert.Equal(t, 1, driverMetrics.started)
|
||||||
|
assert.Equal(t, 1, driverTraces.stopped)
|
||||||
|
assert.Equal(t, 1, driverMetrics.stopped)
|
||||||
|
assert.Equal(t, 1, driverTraces.tracesExported)
|
||||||
|
assert.Equal(t, 0, driverTraces.metricsExported)
|
||||||
|
assert.Equal(t, 0, driverMetrics.tracesExported)
|
||||||
|
assert.Equal(t, 1, driverMetrics.metricsExported)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplitDriverFail(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
for i := 0; i < 16; i++ {
|
||||||
|
var (
|
||||||
|
errStartMetric error
|
||||||
|
errStartTrace error
|
||||||
|
errStopMetric error
|
||||||
|
errStopTrace error
|
||||||
|
)
|
||||||
|
if (i & 1) != 0 {
|
||||||
|
errStartTrace = errors.New("trace start failed")
|
||||||
|
}
|
||||||
|
if (i & 2) != 0 {
|
||||||
|
errStopTrace = errors.New("trace stop failed")
|
||||||
|
}
|
||||||
|
if (i & 4) != 0 {
|
||||||
|
errStartMetric = errors.New("metric start failed")
|
||||||
|
}
|
||||||
|
if (i & 8) != 0 {
|
||||||
|
errStopMetric = errors.New("metric stop failed")
|
||||||
|
}
|
||||||
|
shouldStartFail := errStartTrace != nil || errStartMetric != nil
|
||||||
|
shouldStopFail := errStopTrace != nil || errStopMetric != nil
|
||||||
|
|
||||||
|
driverTraces := &stubProtocolDriver{
|
||||||
|
injectedStartError: errStartTrace,
|
||||||
|
injectedStopError: errStopTrace,
|
||||||
|
}
|
||||||
|
driverMetrics := &stubProtocolDriver{
|
||||||
|
injectedStartError: errStartMetric,
|
||||||
|
injectedStopError: errStopMetric,
|
||||||
|
}
|
||||||
|
config := otlp.SplitConfig{
|
||||||
|
ForMetrics: driverMetrics,
|
||||||
|
ForTraces: driverTraces,
|
||||||
|
}
|
||||||
|
driver := otlp.NewSplitDriver(config)
|
||||||
|
errStart := driver.Start(ctx)
|
||||||
|
if shouldStartFail {
|
||||||
|
assert.Error(t, errStart)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, errStart)
|
||||||
|
}
|
||||||
|
errStop := driver.Stop(ctx)
|
||||||
|
if shouldStopFail {
|
||||||
|
assert.Error(t, errStop)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, errStop)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -16,6 +16,7 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
|
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
|
||||||
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
|
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
|
||||||
@ -49,3 +50,96 @@ type ProtocolDriver interface {
|
|||||||
// take this into account by doing proper locking.
|
// take this into account by doing proper locking.
|
||||||
ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error
|
ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SplitConfig is used to configure a split driver.
|
||||||
|
type SplitConfig struct {
|
||||||
|
// ForMetrics driver will be used for sending metrics to the
|
||||||
|
// collector.
|
||||||
|
ForMetrics ProtocolDriver
|
||||||
|
// ForTraces driver will be used for sending spans to the
|
||||||
|
// collector.
|
||||||
|
ForTraces ProtocolDriver
|
||||||
|
}
|
||||||
|
|
||||||
|
type splitDriver struct {
|
||||||
|
metric ProtocolDriver
|
||||||
|
trace ProtocolDriver
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ ProtocolDriver = (*splitDriver)(nil)
|
||||||
|
|
||||||
|
// NewSplitDriver creates a protocol driver which contains two other
|
||||||
|
// protocol drivers and will forward traces to one of them and metrics
|
||||||
|
// to another.
|
||||||
|
func NewSplitDriver(cfg SplitConfig) ProtocolDriver {
|
||||||
|
return &splitDriver{
|
||||||
|
metric: cfg.ForMetrics,
|
||||||
|
trace: cfg.ForTraces,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start implements ProtocolDriver. It starts both drivers at the same
|
||||||
|
// time.
|
||||||
|
func (d *splitDriver) Start(ctx context.Context) error {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(2)
|
||||||
|
var (
|
||||||
|
metricErr error
|
||||||
|
traceErr error
|
||||||
|
)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
metricErr = d.metric.Start(ctx)
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
traceErr = d.trace.Start(ctx)
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
if metricErr != nil {
|
||||||
|
return metricErr
|
||||||
|
}
|
||||||
|
if traceErr != nil {
|
||||||
|
return traceErr
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop implements ProtocolDriver. It stops both drivers at the same
|
||||||
|
// time.
|
||||||
|
func (d *splitDriver) Stop(ctx context.Context) error {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(2)
|
||||||
|
var (
|
||||||
|
metricErr error
|
||||||
|
traceErr error
|
||||||
|
)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
metricErr = d.metric.Stop(ctx)
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
traceErr = d.trace.Stop(ctx)
|
||||||
|
}()
|
||||||
|
wg.Wait()
|
||||||
|
if metricErr != nil {
|
||||||
|
return metricErr
|
||||||
|
}
|
||||||
|
if traceErr != nil {
|
||||||
|
return traceErr
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExportMetrics implements ProtocolDriver. It forwards the call to
|
||||||
|
// the driver used for sending metrics.
|
||||||
|
func (d *splitDriver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
|
||||||
|
return d.metric.ExportMetrics(ctx, cps, selector)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExportTraces implements ProtocolDriver. It forwards the call to the
|
||||||
|
// driver used for sending spans.
|
||||||
|
func (d *splitDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
|
||||||
|
return d.trace.ExportTraces(ctx, ss)
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user