1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-07-17 01:12:45 +02:00

Merge pull request #806 from codeboten/send-headers

Send configured headers with every request
This commit is contained in:
Tyler Yahn
2020-06-09 15:36:43 -07:00
committed by GitHub
5 changed files with 54 additions and 12 deletions

View File

@ -92,7 +92,7 @@ Some compressors auto-register on import, such as gzip, which can be registered
### `WithHeaders(headers map[string]string)` ### `WithHeaders(headers map[string]string)`
Headers to send when the gRPC stream connection is instantiated. Headers to send with gRPC requests.
### `WithTLSCredentials(creds "google.golang.org/grpc/credentials".TransportCredentials)` ### `WithTLSCredentials(creds "google.golang.org/grpc/credentials".TransportCredentials)`

View File

@ -24,6 +24,7 @@ import (
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
metadata "google.golang.org/grpc/metadata"
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1" colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1" coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
@ -44,8 +45,13 @@ func makeMockCollector(t *testing.T) *mockCol {
} }
type mockTraceService struct { type mockTraceService struct {
mu sync.RWMutex mu sync.RWMutex
rsm map[string]*tracepb.ResourceSpans rsm map[string]*tracepb.ResourceSpans
headers metadata.MD
}
func (mts *mockTraceService) getHeaders() metadata.MD {
return mts.headers
} }
func (mts *mockTraceService) getSpans() []*tracepb.Span { func (mts *mockTraceService) getSpans() []*tracepb.Span {
@ -70,6 +76,7 @@ func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans {
func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) { func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
mts.mu.Lock() mts.mu.Lock()
mts.headers, _ = metadata.FromIncomingContext(ctx)
defer mts.mu.Unlock() defer mts.mu.Unlock()
rss := exp.GetResourceSpans() rss := exp.GetResourceSpans()
for _, rs := range rss { for _, rs := range rss {
@ -192,6 +199,10 @@ func (mc *mockCol) getResourceSpans() []*tracepb.ResourceSpans {
return mc.traceSvc.getResourceSpans() return mc.traceSvc.getResourceSpans()
} }
func (mc *mockCol) getHeaders() metadata.MD {
return mc.traceSvc.getHeaders()
}
func (mc *mockCol) getMetrics() []*metricpb.Metric { func (mc *mockCol) getMetrics() []*metricpb.Metric {
return mc.metricSvc.getMetrics() return mc.metricSvc.getMetrics()
} }

View File

@ -122,8 +122,7 @@ func WithCompressor(compressor string) ExporterOption {
} }
} }
// WithHeaders will send the provided headers when the gRPC stream connection // WithHeaders will send the provided headers with gRPC requests
// is instantiated.
func WithHeaders(headers map[string]string) ExporterOption { func WithHeaders(headers map[string]string) ExporterOption {
return func(cfg *Config) { return func(cfg *Config) {
cfg.headers = headers cfg.headers = headers

View File

@ -50,7 +50,8 @@ type Exporter struct {
backgroundConnectionDoneCh chan bool backgroundConnectionDoneCh chan bool
c Config c Config
metadata metadata.MD
} }
var _ tracesdk.SpanBatcher = (*Exporter)(nil) var _ tracesdk.SpanBatcher = (*Exporter)(nil)
@ -77,6 +78,9 @@ func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
grpcServiceConfig: DefaultGRPCServiceConfig, grpcServiceConfig: DefaultGRPCServiceConfig,
} }
configureOptions(&e.c, opts...) configureOptions(&e.c, opts...)
if len(e.c.headers) > 0 {
e.metadata = metadata.New(e.c.headers)
}
// TODO (rghetia): add resources // TODO (rghetia): add resources
@ -158,6 +162,13 @@ func (e *Exporter) enableConnections(cc *grpc.ClientConn) error {
return nil return nil
} }
func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context {
if e.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, e.metadata)
}
return ctx
}
func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
addr := e.prepareCollectorAddress() addr := e.prepareCollectorAddress()
@ -177,10 +188,7 @@ func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
dialOpts = append(dialOpts, e.c.grpcDialOptions...) dialOpts = append(dialOpts, e.c.grpcDialOptions...)
} }
ctx := context.Background() ctx := e.contextWithMetadata(context.Background())
if len(e.c.headers) > 0 {
ctx = metadata.NewOutgoingContext(ctx, metadata.New(e.c.headers))
}
return grpc.DialContext(ctx, addr, dialOpts...) return grpc.DialContext(ctx, addr, dialOpts...)
} }
@ -246,7 +254,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
return errContextCanceled return errContextCanceled
default: default:
e.senderMu.Lock() e.senderMu.Lock()
_, err := e.metricExporter.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ _, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: rms, ResourceMetrics: rms,
}) })
e.senderMu.Unlock() e.senderMu.Unlock()
@ -281,7 +289,7 @@ func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) {
} }
e.senderMu.Lock() e.senderMu.Lock()
_, err := e.traceExporter.Export(ctx, &coltracepb.ExportTraceServiceRequest{ _, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans, ResourceSpans: protoSpans,
}) })
e.senderMu.Unlock() e.senderMu.Unlock()

View File

@ -23,6 +23,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1" metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
@ -400,3 +401,26 @@ func TestNewExporter_withAddress(t *testing.T) {
t.Fatalf("Unexpected Start error: %v", err) t.Fatalf("Unexpected Start error: %v", err)
} }
} }
func TestNewExporter_withHeaders(t *testing.T) {
mc := runMockCol(t)
defer func() {
_ = mc.stop()
}()
exp, _ := otlp.NewExporter(
otlp.WithInsecure(),
otlp.WithReconnectionPeriod(50*time.Millisecond),
otlp.WithAddress(mc.address),
otlp.WithHeaders(map[string]string{"header1": "value1"}),
)
exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}})
defer func() {
_ = exp.Stop()
}()
headers := mc.getHeaders()
require.Len(t, headers.Get("header1"), 1)
assert.Equal(t, "value1", headers.Get("header1")[0])
}