diff --git a/CHANGELOG.md b/CHANGELOG.md index a7062c744..d59d1cb32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the `ReadOnlySpan` and `ReadWriteSpan` interfaces to provide better control for accessing span data. (#1360) - `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369) +- `NewSplitDriver` for OTLP exporter that allows sending traces and metrics to different endpoints. (#1418) ### Changed diff --git a/exporters/otlp/example_test.go b/exporters/otlp/example_test.go index 35d0727d6..56ec757fe 100644 --- a/exporters/otlp/example_test.go +++ b/exporters/otlp/example_test.go @@ -24,6 +24,10 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -51,6 +55,13 @@ func Example_insecure() { sdktrace.WithMaxExportBatchSize(10), ), ) + defer func() { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + if err := tp.Shutdown(ctx); err != nil { + otel.Handle(err) + } + }() otel.SetTracerProvider(tp) tracer := otel.Tracer("test-tracer") @@ -97,6 +108,13 @@ func Example_withTLS() { sdktrace.WithMaxExportBatchSize(10), ), ) + defer func() { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + if err := tp.Shutdown(ctx); err != nil { + otel.Handle(err) + } + }() otel.SetTracerProvider(tp) tracer := otel.Tracer("test-tracer") @@ -111,3 +129,91 @@ func Example_withTLS() { iSpan.End() } } + +func Example_withDifferentSignalCollectors() { + + // Set different endpoints for the metrics and traces collectors + metricsDriver := otlp.NewGRPCDriver( + otlp.WithInsecure(), + otlp.WithAddress("localhost:30080"), + ) + tracesDriver := otlp.NewGRPCDriver( + otlp.WithInsecure(), + otlp.WithAddress("localhost:30082"), + ) + splitCfg := otlp.SplitConfig{ + ForMetrics: metricsDriver, + ForTraces: tracesDriver, + } + driver := otlp.NewSplitDriver(splitCfg) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, driver) + if err != nil { + log.Fatalf("failed to create the collector exporter: %v", err) + } + + defer func() { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + otel.Handle(err) + } + }() + + tp := sdktrace.NewTracerProvider( + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), + sdktrace.WithBatcher( + exp, + // add following two options to ensure flush + sdktrace.WithBatchTimeout(5), + sdktrace.WithMaxExportBatchSize(10), + ), + ) + defer func() { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + if err := tp.Shutdown(ctx); err != nil { + otel.Handle(err) + } + }() + otel.SetTracerProvider(tp) + + pusher := push.New( + basic.New( + simple.NewWithExactDistribution(), + exp, + ), + exp, + push.WithPeriod(2*time.Second), + ) + otel.SetMeterProvider(pusher.MeterProvider()) + + pusher.Start() + defer pusher.Stop() // pushes any last exports to the receiver + + tracer := otel.Tracer("test-tracer") + meter := otel.Meter("test-meter") + + // Recorder metric example + valuerecorder := metric.Must(meter). + NewFloat64Counter( + "an_important_metric", + metric.WithDescription("Measures the cumulative epicness of the app"), + ) + + // work begins + ctx, span := tracer.Start( + ctx, + "DifferentCollectors-Example") + defer span.End() + for i := 0; i < 10; i++ { + _, iSpan := tracer.Start(ctx, fmt.Sprintf("Sample-%d", i)) + log.Printf("Doing really hard work (%d / 10)\n", i+1) + valuerecorder.Add(ctx, 1.0) + + <-time.After(time.Second) + iSpan.End() + } + + log.Printf("Done!") +} diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index dab79d72b..1030ab9f0 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -97,25 +97,7 @@ func newGRPCExporter(t *testing.T, ctx context.Context, address string, addition return exp } -func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) { - mc := runMockColAtAddr(t, "localhost:56561") - - defer func() { - _ = mc.stop() - }() - - <-time.After(5 * time.Millisecond) - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...) - defer func() { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if err := exp.Shutdown(ctx); err != nil { - panic(err) - } - }() - +func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCol) { pOpts := []sdktrace.TracerProviderOption{ sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), sdktrace.WithBatcher( @@ -239,10 +221,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO // Shutdown the collector too so that we can begin // verification checks of expected data back. - _ = mc.stop() + _ = mcTraces.stop() + _ = mcMetrics.stop() // Now verify that we only got two resources - rss := mc.getResourceSpans() + rss := mcTraces.getResourceSpans() if got, want := len(rss), 2; got != want { t.Fatalf("resource span count: got %d, want %d\n", got, want) } @@ -273,7 +256,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO } } - metrics := mc.getMetrics() + metrics := mcMetrics.getMetrics() assert.Len(t, metrics, len(instruments), "not enough metrics exported") seen := make(map[string]struct{}, len(instruments)) for _, m := range metrics { @@ -342,6 +325,28 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO } } +func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) { + mc := runMockColAtAddr(t, "localhost:56561") + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...) + defer func() { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + panic(err) + } + }() + + runEndToEndTest(t, ctx, exp, mc, mc) +} + func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { mc := runMockCol(t) defer func() { @@ -761,3 +766,44 @@ func TestFailedMetricTransform(t *testing.T) { assert.Error(t, exp.Export(ctx, failCheckpointSet{})) } + +func TestMultiConnectionDriver(t *testing.T) { + mcTraces := runMockCol(t) + mcMetrics := runMockCol(t) + + defer func() { + _ = mcTraces.stop() + _ = mcMetrics.stop() + }() + + <-time.After(5 * time.Millisecond) + + commonOpts := []otlp.GRPCConnectionOption{ + otlp.WithInsecure(), + otlp.WithReconnectionPeriod(50 * time.Millisecond), + otlp.WithGRPCDialOption(grpc.WithBlock()), + } + optsTraces := append([]otlp.GRPCConnectionOption{ + otlp.WithAddress(mcTraces.address), + }, commonOpts...) + optsMetrics := append([]otlp.GRPCConnectionOption{ + otlp.WithAddress(mcMetrics.address), + }, commonOpts...) + + tracesDriver := otlp.NewGRPCDriver(optsTraces...) + metricsDriver := otlp.NewGRPCDriver(optsMetrics...) + splitCfg := otlp.SplitConfig{ + ForMetrics: metricsDriver, + ForTraces: tracesDriver, + } + driver := otlp.NewSplitDriver(splitCfg) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, driver) + if err != nil { + t.Fatalf("failed to create a new collector exporter: %v", err) + } + defer func() { + assert.NoError(t, exp.Shutdown(ctx)) + }() + runEndToEndTest(t, ctx, exp, mcTraces, mcMetrics) +} diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index 660d17ece..9d804b84d 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp +package otlp_test import ( "context" @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp" commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" @@ -692,8 +693,8 @@ func TestStatelessExportKind(t *testing.T) { t.Run(k.name, func(t *testing.T) { runMetricExportTests( t, - []ExporterOption{ - WithMetricExportKindSelector( + []otlp.ExporterOption{ + otlp.WithMetricExportKindSelector( metricsdk.StatelessExportKindSelector(), ), }, @@ -740,7 +741,7 @@ func TestStatelessExportKind(t *testing.T) { } } -func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { +func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { exp, driver := newExporter(t, opts...) recs := map[label.Distinct][]metricsdk.Record{} diff --git a/exporters/otlp/otlp_span_test.go b/exporters/otlp/otlp_span_test.go index c2c6aa6c9..b446fc0e3 100644 --- a/exporters/otlp/otlp_span_test.go +++ b/exporters/otlp/otlp_span_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp +package otlp_test import ( "context" diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index 1a10f5e28..470c465aa 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp +package otlp_test import ( "context" @@ -21,8 +21,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp" metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" "go.opentelemetry.io/otel/exporters/otlp/internal/transform" @@ -32,31 +34,42 @@ import ( ) type stubProtocolDriver struct { + started int + stopped int + tracesExported int + metricsExported int + + injectedStartError error + injectedStopError error + rm []metricpb.ResourceMetrics rs []tracepb.ResourceSpans } -var _ ProtocolDriver = (*stubProtocolDriver)(nil) +var _ otlp.ProtocolDriver = (*stubProtocolDriver)(nil) func (m *stubProtocolDriver) Start(ctx context.Context) error { + m.started++ select { case <-ctx.Done(): return ctx.Err() default: - return nil + return m.injectedStartError } } func (m *stubProtocolDriver) Stop(ctx context.Context) error { + m.stopped++ select { case <-ctx.Done(): return ctx.Err() default: - return nil + return m.injectedStopError } } func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { + m.metricsExported++ rms, err := transform.CheckpointSet(parent, selector, cps, 1) if err != nil { return err @@ -71,6 +84,7 @@ func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk } func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { + m.tracesExported++ for _, rs := range transform.SpanData(ss) { if rs == nil { continue @@ -85,9 +99,9 @@ func (m *stubProtocolDriver) Reset() { m.rs = nil } -func newExporter(t *testing.T, opts ...ExporterOption) (*Exporter, *stubProtocolDriver) { +func newExporter(t *testing.T, opts ...otlp.ExporterOption) (*otlp.Exporter, *stubProtocolDriver) { driver := &stubProtocolDriver{} - exp, err := NewExporter(context.Background(), driver, opts...) + exp, err := otlp.NewExporter(context.Background(), driver, opts...) require.NoError(t, err) return exp, driver } @@ -96,7 +110,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter(&stubProtocolDriver{}) + e := otlp.NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -115,7 +129,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter(&stubProtocolDriver{}) + e := otlp.NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -134,7 +148,7 @@ func TestExporterShutdownNoError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - e := NewUnstartedExporter(&stubProtocolDriver{}) + e := otlp.NewUnstartedExporter(&stubProtocolDriver{}) if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -146,7 +160,7 @@ func TestExporterShutdownNoError(t *testing.T) { func TestExporterShutdownManyTimes(t *testing.T) { ctx := context.Background() - e, err := NewExporter(ctx, &stubProtocolDriver{}) + e, err := otlp.NewExporter(ctx, &stubProtocolDriver{}) if err != nil { t.Fatalf("failed to start an exporter: %v", err) } @@ -170,3 +184,96 @@ func TestExporterShutdownManyTimes(t *testing.T) { } } } + +func TestSplitDriver(t *testing.T) { + driverTraces := &stubProtocolDriver{} + driverMetrics := &stubProtocolDriver{} + config := otlp.SplitConfig{ + ForMetrics: driverMetrics, + ForTraces: driverTraces, + } + driver := otlp.NewSplitDriver(config) + ctx := context.Background() + assert.NoError(t, driver.Start(ctx)) + assert.Equal(t, 1, driverTraces.started) + assert.Equal(t, 1, driverMetrics.started) + assert.Equal(t, 0, driverTraces.stopped) + assert.Equal(t, 0, driverMetrics.stopped) + assert.Equal(t, 0, driverTraces.tracesExported) + assert.Equal(t, 0, driverTraces.metricsExported) + assert.Equal(t, 0, driverMetrics.tracesExported) + assert.Equal(t, 0, driverMetrics.metricsExported) + + assert.NoError(t, driver.ExportMetrics(ctx, discCheckpointSet{}, metricsdk.StatelessExportKindSelector())) + assert.NoError(t, driver.ExportTraces(ctx, []*tracesdk.SpanSnapshot{discSpanSnapshot()})) + assert.Len(t, driverTraces.rm, 0) + assert.Len(t, driverTraces.rs, 1) + assert.Len(t, driverMetrics.rm, 1) + assert.Len(t, driverMetrics.rs, 0) + assert.Equal(t, 1, driverTraces.tracesExported) + assert.Equal(t, 0, driverTraces.metricsExported) + assert.Equal(t, 0, driverMetrics.tracesExported) + assert.Equal(t, 1, driverMetrics.metricsExported) + + assert.NoError(t, driver.Stop(ctx)) + assert.Equal(t, 1, driverTraces.started) + assert.Equal(t, 1, driverMetrics.started) + assert.Equal(t, 1, driverTraces.stopped) + assert.Equal(t, 1, driverMetrics.stopped) + assert.Equal(t, 1, driverTraces.tracesExported) + assert.Equal(t, 0, driverTraces.metricsExported) + assert.Equal(t, 0, driverMetrics.tracesExported) + assert.Equal(t, 1, driverMetrics.metricsExported) +} + +func TestSplitDriverFail(t *testing.T) { + ctx := context.Background() + for i := 0; i < 16; i++ { + var ( + errStartMetric error + errStartTrace error + errStopMetric error + errStopTrace error + ) + if (i & 1) != 0 { + errStartTrace = errors.New("trace start failed") + } + if (i & 2) != 0 { + errStopTrace = errors.New("trace stop failed") + } + if (i & 4) != 0 { + errStartMetric = errors.New("metric start failed") + } + if (i & 8) != 0 { + errStopMetric = errors.New("metric stop failed") + } + shouldStartFail := errStartTrace != nil || errStartMetric != nil + shouldStopFail := errStopTrace != nil || errStopMetric != nil + + driverTraces := &stubProtocolDriver{ + injectedStartError: errStartTrace, + injectedStopError: errStopTrace, + } + driverMetrics := &stubProtocolDriver{ + injectedStartError: errStartMetric, + injectedStopError: errStopMetric, + } + config := otlp.SplitConfig{ + ForMetrics: driverMetrics, + ForTraces: driverTraces, + } + driver := otlp.NewSplitDriver(config) + errStart := driver.Start(ctx) + if shouldStartFail { + assert.Error(t, errStart) + } else { + assert.NoError(t, errStart) + } + errStop := driver.Stop(ctx) + if shouldStopFail { + assert.Error(t, errStop) + } else { + assert.NoError(t, errStop) + } + } +} diff --git a/exporters/otlp/protocoldriver.go b/exporters/otlp/protocoldriver.go index c4992ba1e..9be65e8de 100644 --- a/exporters/otlp/protocoldriver.go +++ b/exporters/otlp/protocoldriver.go @@ -16,6 +16,7 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" import ( "context" + "sync" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" tracesdk "go.opentelemetry.io/otel/sdk/export/trace" @@ -49,3 +50,96 @@ type ProtocolDriver interface { // take this into account by doing proper locking. ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error } + +// SplitConfig is used to configure a split driver. +type SplitConfig struct { + // ForMetrics driver will be used for sending metrics to the + // collector. + ForMetrics ProtocolDriver + // ForTraces driver will be used for sending spans to the + // collector. + ForTraces ProtocolDriver +} + +type splitDriver struct { + metric ProtocolDriver + trace ProtocolDriver +} + +var _ ProtocolDriver = (*splitDriver)(nil) + +// NewSplitDriver creates a protocol driver which contains two other +// protocol drivers and will forward traces to one of them and metrics +// to another. +func NewSplitDriver(cfg SplitConfig) ProtocolDriver { + return &splitDriver{ + metric: cfg.ForMetrics, + trace: cfg.ForTraces, + } +} + +// Start implements ProtocolDriver. It starts both drivers at the same +// time. +func (d *splitDriver) Start(ctx context.Context) error { + wg := sync.WaitGroup{} + wg.Add(2) + var ( + metricErr error + traceErr error + ) + go func() { + defer wg.Done() + metricErr = d.metric.Start(ctx) + }() + go func() { + defer wg.Done() + traceErr = d.trace.Start(ctx) + }() + wg.Wait() + if metricErr != nil { + return metricErr + } + if traceErr != nil { + return traceErr + } + return nil +} + +// Stop implements ProtocolDriver. It stops both drivers at the same +// time. +func (d *splitDriver) Stop(ctx context.Context) error { + wg := sync.WaitGroup{} + wg.Add(2) + var ( + metricErr error + traceErr error + ) + go func() { + defer wg.Done() + metricErr = d.metric.Stop(ctx) + }() + go func() { + defer wg.Done() + traceErr = d.trace.Stop(ctx) + }() + wg.Wait() + if metricErr != nil { + return metricErr + } + if traceErr != nil { + return traceErr + } + return nil +} + +// ExportMetrics implements ProtocolDriver. It forwards the call to +// the driver used for sending metrics. +func (d *splitDriver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { + return d.metric.ExportMetrics(ctx, cps, selector) +} + +// ExportTraces implements ProtocolDriver. It forwards the call to the +// driver used for sending spans. +func (d *splitDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { + return d.trace.ExportTraces(ctx, ss) +}