1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-12 10:04:29 +02:00
opentelemetry-go/instrumentation/grpctrace/interceptor.go
Tyler Yahn 54fffd6467
Update grpctrace instrumentation span names (#922)
* Update grpctrace instrumentation span names

Span names MUST not contain the leading slash (`/`) that the grpc
package prepends to all `FullMethod` values. This replaces the
`serviceFromFullMethod` function with a parsing function. This parsing
function returns an span name adhering to the OpenTelemetry semantic
conventions as well as formatted span attributes.

Additionally, the service name needs to include the package if one
exists. This updates that attribute accordingly.

Once #900 is merged the method attributes can be added by uncommenting.

Resolves #916

* Update Changelog

* Update comment to plural

* Switch from regexp to string parsing

* Consolidate attributes before creating span

* Update Changelog with addition of rpc.method in grpctrace

* Fix test spanMap lookup key

* Update instrumentation/grpctrace/interceptor.go

Co-authored-by: ET <evantorrie@users.noreply.github.com>

* Unify on explicit typed return value

* Fix copy paste error

Co-authored-by: ET <evantorrie@users.noreply.github.com>
2020-07-09 13:12:00 -07:00

484 lines
12 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpctrace
// gRPC tracing middleware
// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/rpc.md
import (
"context"
"io"
"net"
"strings"
"go.opentelemetry.io/otel/api/standard"
"github.com/golang/protobuf/proto" //nolint:staticcheck
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"go.opentelemetry.io/otel/api/correlation"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/trace"
)
type messageType kv.KeyValue
// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m messageType) Event(ctx context.Context, id int, message interface{}) {
span := trace.SpanFromContext(ctx)
if p, ok := message.(proto.Message); ok {
span.AddEvent(ctx, "message",
kv.KeyValue(m),
standard.RPCMessageIDKey.Int(id),
standard.RPCMessageUncompressedSizeKey.Int(proto.Size(p)),
)
} else {
span.AddEvent(ctx, "message",
kv.KeyValue(m),
standard.RPCMessageIDKey.Int(id),
)
}
}
var (
messageSent = messageType(standard.RPCMessageTypeSent)
messageReceived = messageType(standard.RPCMessageTypeReceived)
)
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call.
//
// For example:
// tracer := global.Tracer("client-tracer")
// s := grpc.NewServer(
// grpc.WithUnaryInterceptor(grpctrace.UnaryClientInterceptor(tracer)),
// ..., // (existing DialOptions))
func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
defer span.End()
Inject(ctx, &metadataCopy)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
messageSent.Event(ctx, 1, req)
err := invoker(ctx, method, req, reply, cc, opts...)
messageReceived.Event(ctx, 1, reply)
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(s.Code(), s.Message())
}
return err
}
}
type streamEventType int
type streamEvent struct {
Type streamEventType
Err error
}
const (
closeEvent streamEventType = iota
receiveEndEvent
errorEvent
)
// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
// SendMsg method call.
type clientStream struct {
grpc.ClientStream
desc *grpc.StreamDesc
events chan streamEvent
eventsDone chan struct{}
finished chan error
receivedMessageID int
sentMessageID int
}
var _ = proto.Marshal
func (w *clientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)
if err == nil && !w.desc.ServerStreams {
w.sendStreamEvent(receiveEndEvent, nil)
} else if err == io.EOF {
w.sendStreamEvent(receiveEndEvent, nil)
} else if err != nil {
w.sendStreamEvent(errorEvent, err)
} else {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
return err
}
func (w *clientStream) SendMsg(m interface{}) error {
err := w.ClientStream.SendMsg(m)
w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
return err
}
func (w *clientStream) Header() (metadata.MD, error) {
md, err := w.ClientStream.Header()
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
return md, err
}
func (w *clientStream) CloseSend() error {
err := w.ClientStream.CloseSend()
if err != nil {
w.sendStreamEvent(errorEvent, err)
} else {
w.sendStreamEvent(closeEvent, nil)
}
return err
}
const (
clientClosedState byte = 1 << iota
receiveEndedState
)
func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
events := make(chan streamEvent)
eventsDone := make(chan struct{})
finished := make(chan error)
go func() {
defer close(eventsDone)
// Both streams have to be closed
state := byte(0)
for event := range events {
switch event.Type {
case closeEvent:
state |= clientClosedState
case receiveEndEvent:
state |= receiveEndedState
case errorEvent:
finished <- event.Err
return
}
if state == clientClosedState|receiveEndedState {
finished <- nil
return
}
}
}()
return &clientStream{
ClientStream: s,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
}
}
func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
select {
case <-w.eventsDone:
case w.events <- streamEvent{Type: eventType, Err: err}:
}
}
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call.
//
// For example:
// tracer := global.Tracer("client-tracer")
// s := grpc.Dial(
// grpc.WithStreamInterceptor(grpctrace.StreamClientInterceptor(tracer)),
// ..., // (existing DialOptions))
func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
Inject(ctx, &metadataCopy)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
s, err := streamer(ctx, desc, cc, method, opts...)
stream := wrapClientStream(s, desc)
go func() {
if err == nil {
err = <-stream.finished
}
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(s.Code(), s.Message())
}
span.End()
}()
return stream, err
}
}
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
//
// For example:
// tracer := global.Tracer("client-tracer")
// s := grpc.Dial(
// grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor(tracer)),
// ..., // (existing ServerOptions))
func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()
entries, spanCtx := Extract(ctx, &metadataCopy)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries,
}))
name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)
defer span.End()
messageReceived.Event(ctx, 1, req)
resp, err := handler(ctx, req)
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(s.Code(), s.Message())
messageSent.Event(ctx, 1, s.Proto())
} else {
messageSent.Event(ctx, 1, resp)
}
return resp, err
}
}
// clientStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
// SendMsg method call.
type serverStream struct {
grpc.ServerStream
ctx context.Context
receivedMessageID int
sentMessageID int
}
func (w *serverStream) Context() context.Context {
return w.ctx
}
func (w *serverStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
if err == nil {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
return err
}
func (w *serverStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)
return err
}
func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
return &serverStream{
ServerStream: ss,
ctx: ctx,
}
}
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
//
// For example:
// tracer := global.Tracer("client-tracer")
// s := grpc.Dial(
// grpc.StreamInterceptor(grpctrace.StreamServerInterceptor(tracer)),
// ..., // (existing ServerOptions))
func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
ctx := ss.Context()
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()
entries, spanCtx := Extract(ctx, &metadataCopy)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries,
}))
name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)
defer span.End()
err := handler(srv, wrapServerStream(ctx, ss))
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(s.Code(), s.Message())
}
return err
}
}
// spanInfo returns a span name and all appropriate attributes from the gRPC
// method and peer address.
func spanInfo(fullMethod, peerAddress string) (string, []kv.KeyValue) {
attrs := []kv.KeyValue{standard.RPCSystemGRPC}
name, mAttrs := parseFullMethod(fullMethod)
attrs = append(attrs, mAttrs...)
attrs = append(attrs, peerAttr(peerAddress)...)
return name, attrs
}
// peerAttr returns attributes about the peer address.
func peerAttr(addr string) []kv.KeyValue {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return []kv.KeyValue(nil)
}
if host == "" {
host = "127.0.0.1"
}
return []kv.KeyValue{
standard.NetPeerIPKey.String(host),
standard.NetPeerPortKey.String(port),
}
}
// peerFromCtx returns a peer address from a context, if one exists.
func peerFromCtx(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok {
return ""
}
return p.Addr.String()
}
// parseFullMethod returns a span name following the OpenTelemetry semantic
// conventions as well as all applicable span kv.KeyValue attributes based
// on a gRPC's FullMethod.
func parseFullMethod(fullMethod string) (string, []kv.KeyValue) {
name := strings.TrimLeft(fullMethod, "/")
parts := strings.SplitN(name, "/", 2)
if len(parts) != 2 {
// Invalid format, does not follow `/package.service/method`.
return name, []kv.KeyValue(nil)
}
var attrs []kv.KeyValue
if service := parts[0]; service != "" {
attrs = append(attrs, standard.RPCServiceKey.String(service))
}
if method := parts[1]; method != "" {
attrs = append(attrs, standard.RPCMethodKey.String(method))
}
return name, attrs
}