diff --git a/exporters/otlp/README.md b/exporters/otlp/README.md index 24da77069..8f88a0b8d 100644 --- a/exporters/otlp/README.md +++ b/exporters/otlp/README.md @@ -92,7 +92,7 @@ Some compressors auto-register on import, such as gzip, which can be registered ### `WithHeaders(headers map[string]string)` -Headers to send when the gRPC stream connection is instantiated. +Headers to send with gRPC requests. ### `WithTLSCredentials(creds "google.golang.org/grpc/credentials".TransportCredentials)` diff --git a/exporters/otlp/mock_collector_test.go b/exporters/otlp/mock_collector_test.go index 6b3284c38..b391014d3 100644 --- a/exporters/otlp/mock_collector_test.go +++ b/exporters/otlp/mock_collector_test.go @@ -24,6 +24,7 @@ import ( "time" "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1" coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1" @@ -44,8 +45,13 @@ func makeMockCollector(t *testing.T) *mockCol { } type mockTraceService struct { - mu sync.RWMutex - rsm map[string]*tracepb.ResourceSpans + mu sync.RWMutex + rsm map[string]*tracepb.ResourceSpans + headers metadata.MD +} + +func (mts *mockTraceService) getHeaders() metadata.MD { + return mts.headers } func (mts *mockTraceService) getSpans() []*tracepb.Span { @@ -70,6 +76,7 @@ func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans { func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) { mts.mu.Lock() + mts.headers, _ = metadata.FromIncomingContext(ctx) defer mts.mu.Unlock() rss := exp.GetResourceSpans() for _, rs := range rss { @@ -192,6 +199,10 @@ func (mc *mockCol) getResourceSpans() []*tracepb.ResourceSpans { return mc.traceSvc.getResourceSpans() } +func (mc *mockCol) getHeaders() metadata.MD { + return mc.traceSvc.getHeaders() +} + func (mc *mockCol) getMetrics() []*metricpb.Metric { return mc.metricSvc.getMetrics() } diff --git a/exporters/otlp/options.go b/exporters/otlp/options.go index cf86a081c..7d9b6f991 100644 --- a/exporters/otlp/options.go +++ b/exporters/otlp/options.go @@ -122,8 +122,7 @@ func WithCompressor(compressor string) ExporterOption { } } -// WithHeaders will send the provided headers when the gRPC stream connection -// is instantiated. +// WithHeaders will send the provided headers with gRPC requests func WithHeaders(headers map[string]string) ExporterOption { return func(cfg *Config) { cfg.headers = headers diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 0c4d9a30a..1db7899a0 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -50,7 +50,8 @@ type Exporter struct { backgroundConnectionDoneCh chan bool - c Config + c Config + metadata metadata.MD } var _ tracesdk.SpanBatcher = (*Exporter)(nil) @@ -77,6 +78,9 @@ func NewUnstartedExporter(opts ...ExporterOption) *Exporter { grpcServiceConfig: DefaultGRPCServiceConfig, } configureOptions(&e.c, opts...) + if len(e.c.headers) > 0 { + e.metadata = metadata.New(e.c.headers) + } // TODO (rghetia): add resources @@ -158,6 +162,13 @@ func (e *Exporter) enableConnections(cc *grpc.ClientConn) error { return nil } +func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context { + if e.metadata.Len() > 0 { + return metadata.NewOutgoingContext(ctx, e.metadata) + } + return ctx +} + func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { addr := e.prepareCollectorAddress() @@ -177,10 +188,7 @@ func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { dialOpts = append(dialOpts, e.c.grpcDialOptions...) } - ctx := context.Background() - if len(e.c.headers) > 0 { - ctx = metadata.NewOutgoingContext(ctx, metadata.New(e.c.headers)) - } + ctx := e.contextWithMetadata(context.Background()) return grpc.DialContext(ctx, addr, dialOpts...) } @@ -246,7 +254,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e return errContextCanceled default: e.senderMu.Lock() - _, err := e.metricExporter.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ + _, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: rms, }) e.senderMu.Unlock() @@ -281,7 +289,7 @@ func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) { } e.senderMu.Lock() - _, err := e.traceExporter.Export(ctx, &coltracepb.ExportTraceServiceRequest{ + _, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ ResourceSpans: protoSpans, }) e.senderMu.Unlock() diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index 0f01eeace..dd7abba58 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1" @@ -400,3 +401,26 @@ func TestNewExporter_withAddress(t *testing.T) { t.Fatalf("Unexpected Start error: %v", err) } } + +func TestNewExporter_withHeaders(t *testing.T) { + mc := runMockCol(t) + defer func() { + _ = mc.stop() + }() + + exp, _ := otlp.NewExporter( + otlp.WithInsecure(), + otlp.WithReconnectionPeriod(50*time.Millisecond), + otlp.WithAddress(mc.address), + otlp.WithHeaders(map[string]string{"header1": "value1"}), + ) + exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}) + + defer func() { + _ = exp.Stop() + }() + + headers := mc.getHeaders() + require.Len(t, headers.Get("header1"), 1) + assert.Equal(t, "value1", headers.Get("header1")[0]) +}