1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-03-05 15:05:51 +02:00

460 lines
11 KiB
Go
Raw Normal View History

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpctrace
// gRPC tracing middleware
// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/rpc.md
import (
"context"
"io"
"net"
"strings"
"go.opentelemetry.io/otel/api/standard"
"github.com/golang/protobuf/proto" //nolint:staticcheck
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"go.opentelemetry.io/otel/api/correlation"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/trace"
)
type messageType kv.KeyValue
// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m messageType) Event(ctx context.Context, id int, message interface{}) {
span := trace.SpanFromContext(ctx)
if p, ok := message.(proto.Message); ok {
span.AddEvent(ctx, "message",
kv.KeyValue(m),
standard.RPCMessageIDKey.Int(id),
standard.RPCMessageUncompressedSizeKey.Int(proto.Size(p)),
)
} else {
span.AddEvent(ctx, "message",
kv.KeyValue(m),
standard.RPCMessageIDKey.Int(id),
)
}
}
var (
messageSent = messageType(standard.RPCMessageTypeSent)
messageReceived = messageType(standard.RPCMessageTypeReceived)
)
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call.
func UnaryClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
callOpts ...grpc.CallOption,
) error {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
defer span.End()
Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
messageSent.Event(ctx, 1, req)
err := invoker(ctx, method, req, reply, cc, callOpts...)
messageReceived.Event(ctx, 1, reply)
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(s.Code(), s.Message())
}
return err
}
}
type streamEventType int
type streamEvent struct {
Type streamEventType
Err error
}
const (
closeEvent streamEventType = iota
receiveEndEvent
errorEvent
)
// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
// SendMsg method call.
type clientStream struct {
grpc.ClientStream
desc *grpc.StreamDesc
events chan streamEvent
eventsDone chan struct{}
finished chan error
receivedMessageID int
sentMessageID int
}
var _ = proto.Marshal
func (w *clientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)
if err == nil && !w.desc.ServerStreams {
w.sendStreamEvent(receiveEndEvent, nil)
} else if err == io.EOF {
w.sendStreamEvent(receiveEndEvent, nil)
} else if err != nil {
w.sendStreamEvent(errorEvent, err)
} else {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
return err
}
func (w *clientStream) SendMsg(m interface{}) error {
err := w.ClientStream.SendMsg(m)
w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
return err
}
func (w *clientStream) Header() (metadata.MD, error) {
md, err := w.ClientStream.Header()
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
return md, err
}
func (w *clientStream) CloseSend() error {
err := w.ClientStream.CloseSend()
if err != nil {
w.sendStreamEvent(errorEvent, err)
} else {
w.sendStreamEvent(closeEvent, nil)
}
return err
}
const (
clientClosedState byte = 1 << iota
receiveEndedState
)
func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
events := make(chan streamEvent)
eventsDone := make(chan struct{})
finished := make(chan error)
go func() {
defer close(eventsDone)
// Both streams have to be closed
state := byte(0)
for event := range events {
switch event.Type {
case closeEvent:
state |= clientClosedState
case receiveEndEvent:
state |= receiveEndedState
case errorEvent:
finished <- event.Err
return
}
if state == clientClosedState|receiveEndedState {
finished <- nil
return
}
}
}()
return &clientStream{
ClientStream: s,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
}
}
func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
select {
case <-w.eventsDone:
case w.events <- streamEvent{Type: eventType, Err: err}:
}
}
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call.
func StreamClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamClientInterceptor {
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
callOpts ...grpc.CallOption,
) (grpc.ClientStream, error) {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
s, err := streamer(ctx, desc, cc, method, callOpts...)
stream := wrapClientStream(s, desc)
go func() {
if err == nil {
err = <-stream.finished
}
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(s.Code(), s.Message())
}
span.End()
}()
return stream, err
}
}
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
func UnaryServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()
entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries,
}))
name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)
defer span.End()
messageReceived.Event(ctx, 1, req)
resp, err := handler(ctx, req)
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(s.Code(), s.Message())
messageSent.Event(ctx, 1, s.Proto())
} else {
messageSent.Event(ctx, 1, resp)
}
return resp, err
}
}
// serverStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
// SendMsg method call.
type serverStream struct {
grpc.ServerStream
ctx context.Context
receivedMessageID int
sentMessageID int
}
func (w *serverStream) Context() context.Context {
return w.ctx
}
func (w *serverStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
if err == nil {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
return err
}
func (w *serverStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)
return err
}
func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
return &serverStream{
ServerStream: ss,
ctx: ctx,
}
}
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
func StreamServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
ctx := ss.Context()
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()
entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries,
}))
name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)
defer span.End()
err := handler(srv, wrapServerStream(ctx, ss))
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(s.Code(), s.Message())
}
return err
}
}
// spanInfo returns a span name and all appropriate attributes from the gRPC
// method and peer address.
func spanInfo(fullMethod, peerAddress string) (string, []kv.KeyValue) {
attrs := []kv.KeyValue{standard.RPCSystemGRPC}
name, mAttrs := parseFullMethod(fullMethod)
attrs = append(attrs, mAttrs...)
attrs = append(attrs, peerAttr(peerAddress)...)
return name, attrs
}
// peerAttr returns attributes about the peer address.
func peerAttr(addr string) []kv.KeyValue {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return []kv.KeyValue(nil)
}
if host == "" {
host = "127.0.0.1"
}
return []kv.KeyValue{
standard.NetPeerIPKey.String(host),
standard.NetPeerPortKey.String(port),
}
}
// peerFromCtx returns a peer address from a context, if one exists.
func peerFromCtx(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok {
return ""
}
return p.Addr.String()
}
// parseFullMethod returns a span name following the OpenTelemetry semantic
// conventions as well as all applicable span kv.KeyValue attributes based
// on a gRPC's FullMethod.
func parseFullMethod(fullMethod string) (string, []kv.KeyValue) {
name := strings.TrimLeft(fullMethod, "/")
parts := strings.SplitN(name, "/", 2)
if len(parts) != 2 {
// Invalid format, does not follow `/package.service/method`.
return name, []kv.KeyValue(nil)
}
var attrs []kv.KeyValue
if service := parts[0]; service != "" {
attrs = append(attrs, standard.RPCServiceKey.String(service))
}
if method := parts[1]; method != "" {
attrs = append(attrs, standard.RPCMethodKey.String(method))
}
return name, attrs
}