1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-03 22:52:30 +02:00

Add propagator option for gRPC instrumentation (#986)

* Add propagator option for gRPC instrumentation

* Update CHANGELOG

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Sam Xie 2020-07-30 06:58:34 +08:00 committed by GitHub
parent 8fbaa9d432
commit 26e85e1830
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 12 deletions

View File

@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The Zipkin exporter now has `NewExportPipeline` and `InstallNewPipeline` constructor functions to match the common pattern. - The Zipkin exporter now has `NewExportPipeline` and `InstallNewPipeline` constructor functions to match the common pattern.
These function build a new exporter with default SDK options and register the exporter with the `global` package respectively. (#944) These function build a new exporter with default SDK options and register the exporter with the `global` package respectively. (#944)
- Add propagator option for gRPC instrumentation. (#986)
### Changed ### Changed

View File

@ -63,14 +63,14 @@ var (
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable // UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call. // for use in a grpc.Dial call.
func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor { func UnaryClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryClientInterceptor {
return func( return func(
ctx context.Context, ctx context.Context,
method string, method string,
req, reply interface{}, req, reply interface{},
cc *grpc.ClientConn, cc *grpc.ClientConn,
invoker grpc.UnaryInvoker, invoker grpc.UnaryInvoker,
opts ...grpc.CallOption, callOpts ...grpc.CallOption,
) error { ) error {
requestMetadata, _ := metadata.FromOutgoingContext(ctx) requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy() metadataCopy := requestMetadata.Copy()
@ -85,12 +85,12 @@ func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
) )
defer span.End() defer span.End()
Inject(ctx, &metadataCopy) Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy) ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
messageSent.Event(ctx, 1, req) messageSent.Event(ctx, 1, req)
err := invoker(ctx, method, req, reply, cc, opts...) err := invoker(ctx, method, req, reply, cc, callOpts...)
messageReceived.Event(ctx, 1, reply) messageReceived.Event(ctx, 1, reply)
@ -236,14 +236,14 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable // StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call. // for use in a grpc.Dial call.
func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor { func StreamClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamClientInterceptor {
return func( return func(
ctx context.Context, ctx context.Context,
desc *grpc.StreamDesc, desc *grpc.StreamDesc,
cc *grpc.ClientConn, cc *grpc.ClientConn,
method string, method string,
streamer grpc.Streamer, streamer grpc.Streamer,
opts ...grpc.CallOption, callOpts ...grpc.CallOption,
) (grpc.ClientStream, error) { ) (grpc.ClientStream, error) {
requestMetadata, _ := metadata.FromOutgoingContext(ctx) requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy() metadataCopy := requestMetadata.Copy()
@ -257,10 +257,10 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
trace.WithAttributes(attr...), trace.WithAttributes(attr...),
) )
Inject(ctx, &metadataCopy) Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy) ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
s, err := streamer(ctx, desc, cc, method, opts...) s, err := streamer(ctx, desc, cc, method, callOpts...)
stream := wrapClientStream(s, desc) stream := wrapClientStream(s, desc)
go func() { go func() {
@ -282,7 +282,7 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable // UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call. // for use in a grpc.NewServer call.
func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor { func UnaryServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryServerInterceptor {
return func( return func(
ctx context.Context, ctx context.Context,
req interface{}, req interface{},
@ -292,7 +292,7 @@ func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
requestMetadata, _ := metadata.FromIncomingContext(ctx) requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy() metadataCopy := requestMetadata.Copy()
entries, spanCtx := Extract(ctx, &metadataCopy) entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{ ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries, MultiKV: entries,
})) }))
@ -364,7 +364,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable // StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call. // for use in a grpc.NewServer call.
func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor { func StreamServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamServerInterceptor {
return func( return func(
srv interface{}, srv interface{},
ss grpc.ServerStream, ss grpc.ServerStream,
@ -376,7 +376,7 @@ func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
requestMetadata, _ := metadata.FromIncomingContext(ctx) requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy() metadataCopy := requestMetadata.Copy()
entries, spanCtx := Extract(ctx, &metadataCopy) entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{ ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries, MultiKV: entries,
})) }))