diff --git a/example/grpc/api/hello-service.pb.go b/example/grpc/api/hello-service.pb.go index bf665c923..e6ec648f8 100644 --- a/example/grpc/api/hello-service.pb.go +++ b/example/grpc/api/hello-service.pb.go @@ -1,16 +1,25 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // source: hello-service.proto +/* +Package api is a generated protocol buffer package. + +It is generated from these files: + hello-service.proto + +It has these top-level messages: + HelloRequest + HelloResponse +*/ package api +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" + context "golang.org/x/net/context" grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - math "math" ) // Reference imports to suppress errors if they are not otherwise used. @@ -22,39 +31,16 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type HelloRequest struct { - Greeting string `protobuf:"bytes,1,opt,name=greeting,proto3" json:"greeting,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Greeting string `protobuf:"bytes,1,opt,name=greeting" json:"greeting,omitempty"` } -func (m *HelloRequest) Reset() { *m = HelloRequest{} } -func (m *HelloRequest) String() string { return proto.CompactTextString(m) } -func (*HelloRequest) ProtoMessage() {} -func (*HelloRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_b5bbcd7ae0220f22, []int{0} -} - -func (m *HelloRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_HelloRequest.Unmarshal(m, b) -} -func (m *HelloRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_HelloRequest.Marshal(b, m, deterministic) -} -func (m *HelloRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_HelloRequest.Merge(m, src) -} -func (m *HelloRequest) XXX_Size() int { - return xxx_messageInfo_HelloRequest.Size(m) -} -func (m *HelloRequest) XXX_DiscardUnknown() { - xxx_messageInfo_HelloRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_HelloRequest proto.InternalMessageInfo +func (m *HelloRequest) Reset() { *m = HelloRequest{} } +func (m *HelloRequest) String() string { return proto.CompactTextString(m) } +func (*HelloRequest) ProtoMessage() {} +func (*HelloRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } func (m *HelloRequest) GetGreeting() string { if m != nil { @@ -64,36 +50,13 @@ func (m *HelloRequest) GetGreeting() string { } type HelloResponse struct { - Reply string `protobuf:"bytes,1,opt,name=reply,proto3" json:"reply,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Reply string `protobuf:"bytes,1,opt,name=reply" json:"reply,omitempty"` } -func (m *HelloResponse) Reset() { *m = HelloResponse{} } -func (m *HelloResponse) String() string { return proto.CompactTextString(m) } -func (*HelloResponse) ProtoMessage() {} -func (*HelloResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_b5bbcd7ae0220f22, []int{1} -} - -func (m *HelloResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_HelloResponse.Unmarshal(m, b) -} -func (m *HelloResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_HelloResponse.Marshal(b, m, deterministic) -} -func (m *HelloResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_HelloResponse.Merge(m, src) -} -func (m *HelloResponse) XXX_Size() int { - return xxx_messageInfo_HelloResponse.Size(m) -} -func (m *HelloResponse) XXX_DiscardUnknown() { - xxx_messageInfo_HelloResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_HelloResponse proto.InternalMessageInfo +func (m *HelloResponse) Reset() { *m = HelloResponse{} } +func (m *HelloResponse) String() string { return proto.CompactTextString(m) } +func (*HelloResponse) ProtoMessage() {} +func (*HelloResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } func (m *HelloResponse) GetReply() string { if m != nil { @@ -107,22 +70,6 @@ func init() { proto.RegisterType((*HelloResponse)(nil), "api.HelloResponse") } -func init() { proto.RegisterFile("hello-service.proto", fileDescriptor_b5bbcd7ae0220f22) } - -var fileDescriptor_b5bbcd7ae0220f22 = []byte{ - // 146 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xce, 0x48, 0xcd, 0xc9, - 0xc9, 0xd7, 0x2d, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, - 0x62, 0x4e, 0x2c, 0xc8, 0x54, 0xd2, 0xe2, 0xe2, 0xf1, 0x00, 0xc9, 0x05, 0xa5, 0x16, 0x96, 0xa6, - 0x16, 0x97, 0x08, 0x49, 0x71, 0x71, 0xa4, 0x17, 0xa5, 0xa6, 0x96, 0x64, 0xe6, 0xa5, 0x4b, 0x30, - 0x2a, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xf9, 0x4a, 0xaa, 0x5c, 0xbc, 0x50, 0xb5, 0xc5, 0x05, 0xf9, - 0x79, 0xc5, 0xa9, 0x42, 0x22, 0x5c, 0xac, 0x45, 0xa9, 0x05, 0x39, 0x95, 0x50, 0x95, 0x10, 0x8e, - 0x91, 0x23, 0xd4, 0xc8, 0x60, 0x88, 0x6d, 0x42, 0x86, 0x5c, 0x1c, 0xc1, 0x89, 0x95, 0x60, 0x21, - 0x21, 0x41, 0xbd, 0xc4, 0x82, 0x4c, 0x3d, 0x64, 0x1b, 0xa5, 0x84, 0x90, 0x85, 0x20, 0x06, 0x27, - 0xb1, 0x81, 0x5d, 0x68, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xce, 0x48, 0xd8, 0xe7, 0xb8, 0x00, - 0x00, 0x00, -} - // Reference imports to suppress errors if they are not otherwise used. var _ context.Context var _ grpc.ClientConn @@ -131,11 +78,13 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// HelloServiceClient is the client API for HelloService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +// Client API for HelloService service + type HelloServiceClient interface { SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) + SayHelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (HelloService_SayHelloServerStreamClient, error) + SayHelloClientStream(ctx context.Context, opts ...grpc.CallOption) (HelloService_SayHelloClientStreamClient, error) + SayHelloBidiStream(ctx context.Context, opts ...grpc.CallOption) (HelloService_SayHelloBidiStreamClient, error) } type helloServiceClient struct { @@ -148,24 +97,117 @@ func NewHelloServiceClient(cc *grpc.ClientConn) HelloServiceClient { func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) { out := new(HelloResponse) - err := c.cc.Invoke(ctx, "/api.HelloService/SayHello", in, out, opts...) + err := grpc.Invoke(ctx, "/api.HelloService/SayHello", in, out, c.cc, opts...) if err != nil { return nil, err } return out, nil } -// HelloServiceServer is the server API for HelloService service. +func (c *helloServiceClient) SayHelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (HelloService_SayHelloServerStreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_HelloService_serviceDesc.Streams[0], c.cc, "/api.HelloService/SayHelloServerStream", opts...) + if err != nil { + return nil, err + } + x := &helloServiceSayHelloServerStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type HelloService_SayHelloServerStreamClient interface { + Recv() (*HelloResponse, error) + grpc.ClientStream +} + +type helloServiceSayHelloServerStreamClient struct { + grpc.ClientStream +} + +func (x *helloServiceSayHelloServerStreamClient) Recv() (*HelloResponse, error) { + m := new(HelloResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *helloServiceClient) SayHelloClientStream(ctx context.Context, opts ...grpc.CallOption) (HelloService_SayHelloClientStreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_HelloService_serviceDesc.Streams[1], c.cc, "/api.HelloService/SayHelloClientStream", opts...) + if err != nil { + return nil, err + } + x := &helloServiceSayHelloClientStreamClient{stream} + return x, nil +} + +type HelloService_SayHelloClientStreamClient interface { + Send(*HelloRequest) error + CloseAndRecv() (*HelloResponse, error) + grpc.ClientStream +} + +type helloServiceSayHelloClientStreamClient struct { + grpc.ClientStream +} + +func (x *helloServiceSayHelloClientStreamClient) Send(m *HelloRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *helloServiceSayHelloClientStreamClient) CloseAndRecv() (*HelloResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(HelloResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *helloServiceClient) SayHelloBidiStream(ctx context.Context, opts ...grpc.CallOption) (HelloService_SayHelloBidiStreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_HelloService_serviceDesc.Streams[2], c.cc, "/api.HelloService/SayHelloBidiStream", opts...) + if err != nil { + return nil, err + } + x := &helloServiceSayHelloBidiStreamClient{stream} + return x, nil +} + +type HelloService_SayHelloBidiStreamClient interface { + Send(*HelloRequest) error + Recv() (*HelloResponse, error) + grpc.ClientStream +} + +type helloServiceSayHelloBidiStreamClient struct { + grpc.ClientStream +} + +func (x *helloServiceSayHelloBidiStreamClient) Send(m *HelloRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *helloServiceSayHelloBidiStreamClient) Recv() (*HelloResponse, error) { + m := new(HelloResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for HelloService service + type HelloServiceServer interface { SayHello(context.Context, *HelloRequest) (*HelloResponse, error) -} - -// UnimplementedHelloServiceServer can be embedded to have forward compatible implementations. -type UnimplementedHelloServiceServer struct { -} - -func (*UnimplementedHelloServiceServer) SayHello(ctx context.Context, req *HelloRequest) (*HelloResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented") + SayHelloServerStream(*HelloRequest, HelloService_SayHelloServerStreamServer) error + SayHelloClientStream(HelloService_SayHelloClientStreamServer) error + SayHelloBidiStream(HelloService_SayHelloBidiStreamServer) error } func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) { @@ -190,6 +232,79 @@ func _HelloService_SayHello_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _HelloService_SayHelloServerStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(HelloRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(HelloServiceServer).SayHelloServerStream(m, &helloServiceSayHelloServerStreamServer{stream}) +} + +type HelloService_SayHelloServerStreamServer interface { + Send(*HelloResponse) error + grpc.ServerStream +} + +type helloServiceSayHelloServerStreamServer struct { + grpc.ServerStream +} + +func (x *helloServiceSayHelloServerStreamServer) Send(m *HelloResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _HelloService_SayHelloClientStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HelloServiceServer).SayHelloClientStream(&helloServiceSayHelloClientStreamServer{stream}) +} + +type HelloService_SayHelloClientStreamServer interface { + SendAndClose(*HelloResponse) error + Recv() (*HelloRequest, error) + grpc.ServerStream +} + +type helloServiceSayHelloClientStreamServer struct { + grpc.ServerStream +} + +func (x *helloServiceSayHelloClientStreamServer) SendAndClose(m *HelloResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *helloServiceSayHelloClientStreamServer) Recv() (*HelloRequest, error) { + m := new(HelloRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _HelloService_SayHelloBidiStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HelloServiceServer).SayHelloBidiStream(&helloServiceSayHelloBidiStreamServer{stream}) +} + +type HelloService_SayHelloBidiStreamServer interface { + Send(*HelloResponse) error + Recv() (*HelloRequest, error) + grpc.ServerStream +} + +type helloServiceSayHelloBidiStreamServer struct { + grpc.ServerStream +} + +func (x *helloServiceSayHelloBidiStreamServer) Send(m *HelloResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *helloServiceSayHelloBidiStreamServer) Recv() (*HelloRequest, error) { + m := new(HelloRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + var _HelloService_serviceDesc = grpc.ServiceDesc{ ServiceName: "api.HelloService", HandlerType: (*HelloServiceServer)(nil), @@ -199,6 +314,41 @@ var _HelloService_serviceDesc = grpc.ServiceDesc{ Handler: _HelloService_SayHello_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SayHelloServerStream", + Handler: _HelloService_SayHelloServerStream_Handler, + ServerStreams: true, + }, + { + StreamName: "SayHelloClientStream", + Handler: _HelloService_SayHelloClientStream_Handler, + ClientStreams: true, + }, + { + StreamName: "SayHelloBidiStream", + Handler: _HelloService_SayHelloBidiStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "hello-service.proto", } + +func init() { proto.RegisterFile("hello-service.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 192 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xce, 0x48, 0xcd, 0xc9, + 0xc9, 0xd7, 0x2d, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, + 0x62, 0x4e, 0x2c, 0xc8, 0x54, 0xd2, 0xe2, 0xe2, 0xf1, 0x00, 0xc9, 0x05, 0xa5, 0x16, 0x96, 0xa6, + 0x16, 0x97, 0x08, 0x49, 0x71, 0x71, 0xa4, 0x17, 0xa5, 0xa6, 0x96, 0x64, 0xe6, 0xa5, 0x4b, 0x30, + 0x2a, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xf9, 0x4a, 0xaa, 0x5c, 0xbc, 0x50, 0xb5, 0xc5, 0x05, 0xf9, + 0x79, 0xc5, 0xa9, 0x42, 0x22, 0x5c, 0xac, 0x45, 0xa9, 0x05, 0x39, 0x95, 0x50, 0x95, 0x10, 0x8e, + 0x51, 0x0b, 0x13, 0xd4, 0xcc, 0x60, 0x88, 0x75, 0x42, 0x86, 0x5c, 0x1c, 0xc1, 0x89, 0x95, 0x60, + 0x21, 0x21, 0x41, 0xbd, 0xc4, 0x82, 0x4c, 0x3d, 0x64, 0x2b, 0xa5, 0x84, 0x90, 0x85, 0xa0, 0x26, + 0xdb, 0x73, 0x89, 0xc0, 0xb4, 0x80, 0x4c, 0x49, 0x2d, 0x0a, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0x25, + 0x52, 0xbb, 0x01, 0x23, 0xb2, 0x01, 0xce, 0x39, 0x99, 0xa9, 0x79, 0x25, 0x24, 0x19, 0xa0, 0x01, + 0x32, 0x40, 0x08, 0x66, 0x80, 0x53, 0x66, 0x4a, 0x26, 0x89, 0xda, 0x0d, 0x18, 0x93, 0xd8, 0xc0, + 0xa1, 0x6c, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x0e, 0xd5, 0x1c, 0xd2, 0x7c, 0x01, 0x00, 0x00, +} diff --git a/example/grpc/api/hello-service.proto b/example/grpc/api/hello-service.proto index ecdafc79d..699645127 100644 --- a/example/grpc/api/hello-service.proto +++ b/example/grpc/api/hello-service.proto @@ -17,6 +17,12 @@ package api; service HelloService { rpc SayHello (HelloRequest) returns (HelloResponse); + + rpc SayHelloServerStream (HelloRequest) returns (stream HelloResponse); + + rpc SayHelloClientStream (stream HelloRequest) returns (HelloResponse); + + rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloResponse); } message HelloRequest { diff --git a/example/grpc/client/main.go b/example/grpc/client/main.go index 3cba9e7b2..6131c0bf9 100644 --- a/example/grpc/client/main.go +++ b/example/grpc/client/main.go @@ -16,23 +16,28 @@ package main import ( "context" + "io" "log" "time" + "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/example/grpc/api" "go.opentelemetry.io/otel/example/grpc/config" + "go.opentelemetry.io/otel/plugin/grpctrace" "google.golang.org/grpc" "google.golang.org/grpc/metadata" - - "go.opentelemetry.io/otel/example/grpc/middleware/tracing" ) func main() { config.Init() var conn *grpc.ClientConn - conn, err := grpc.Dial(":7777", grpc.WithInsecure(), grpc.WithUnaryInterceptor(tracing.UnaryClientInterceptor)) + conn, err := grpc.Dial(":7777", grpc.WithInsecure(), + grpc.WithUnaryInterceptor(grpctrace.UnaryClientInterceptor(global.Tracer(""))), + grpc.WithStreamInterceptor(grpctrace.StreamClientInterceptor(global.Tracer(""))), + ) + if err != nil { log.Fatalf("did not connect: %s", err) } @@ -40,11 +45,21 @@ func main() { c := api.NewHelloServiceClient(conn) + callSayHello(c) + callSayHelloClientStream(c) + callSayHelloServerStream(c) + callSayHelloBidiStream(c) + + time.Sleep(10 * time.Millisecond) +} + +func callSayHello(c api.HelloServiceClient) { md := metadata.Pairs( "timestamp", time.Now().Format(time.StampNano), "client-id", "web-api-client-us-east-1", "user-id", "some-test-user-id", ) + ctx := metadata.NewOutgoingContext(context.Background(), md) response, err := c.SayHello(ctx, &api.HelloRequest{Greeting: "World"}) if err != nil { @@ -52,3 +67,116 @@ func main() { } log.Printf("Response from server: %s", response.Reply) } + +func callSayHelloClientStream(c api.HelloServiceClient) { + md := metadata.Pairs( + "timestamp", time.Now().Format(time.StampNano), + "client-id", "web-api-client-us-east-1", + "user-id", "some-test-user-id", + ) + + ctx := metadata.NewOutgoingContext(context.Background(), md) + stream, err := c.SayHelloClientStream(ctx) + if err != nil { + log.Fatalf("Error when opening SayHelloClientStream: %s", err) + } + + for i := 0; i < 5; i++ { + err := stream.Send(&api.HelloRequest{Greeting: "World"}) + + time.Sleep(time.Duration(i*50) * time.Millisecond) + + if err != nil { + log.Fatalf("Error when sending to SayHelloClientStream: %s", err) + } + } + + response, err := stream.CloseAndRecv() + if err != nil { + log.Fatalf("Error when closing SayHelloClientStream: %s", err) + } + + log.Printf("Response from server: %s", response.Reply) +} + +func callSayHelloServerStream(c api.HelloServiceClient) { + md := metadata.Pairs( + "timestamp", time.Now().Format(time.StampNano), + "client-id", "web-api-client-us-east-1", + "user-id", "some-test-user-id", + ) + + ctx := metadata.NewOutgoingContext(context.Background(), md) + stream, err := c.SayHelloServerStream(ctx, &api.HelloRequest{Greeting: "World"}) + if err != nil { + log.Fatalf("Error when opening SayHelloServerStream: %s", err) + } + + for { + response, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + log.Fatalf("Error when receiving from SayHelloServerStream: %s", err) + } + + log.Printf("Response from server: %s", response.Reply) + time.Sleep(50 * time.Millisecond) + } +} + +func callSayHelloBidiStream(c api.HelloServiceClient) { + md := metadata.Pairs( + "timestamp", time.Now().Format(time.StampNano), + "client-id", "web-api-client-us-east-1", + "user-id", "some-test-user-id", + ) + + ctx := metadata.NewOutgoingContext(context.Background(), md) + stream, err := c.SayHelloBidiStream(ctx) + if err != nil { + log.Fatalf("Error when opening SayHelloBidiStream: %s", err) + } + + serverClosed := make(chan struct{}) + clientClosed := make(chan struct{}) + + go func() { + for i := 0; i < 5; i++ { + err := stream.Send(&api.HelloRequest{Greeting: "World"}) + + if err != nil { + log.Fatalf("Error when sending to SayHelloBidiStream: %s", err) + } + + time.Sleep(50 * time.Millisecond) + } + + err := stream.CloseSend() + if err != nil { + log.Fatalf("Error when closing SayHelloBidiStream: %s", err) + } + + clientClosed <- struct{}{} + }() + + go func() { + for { + response, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + log.Fatalf("Error when receiving from SayHelloBidiStream: %s", err) + } + + log.Printf("Response from server: %s", response.Reply) + time.Sleep(50 * time.Millisecond) + } + + serverClosed <- struct{}{} + }() + + // Wait until client and server both closed the connection. + <-clientClosed + <-serverClosed +} diff --git a/example/grpc/go.mod b/example/grpc/go.mod index 5689d8926..4299130bb 100644 --- a/example/grpc/go.mod +++ b/example/grpc/go.mod @@ -7,5 +7,6 @@ replace go.opentelemetry.io/otel => ../.. require ( github.com/golang/protobuf v1.3.2 go.opentelemetry.io/otel v0.4.2 + golang.org/x/net v0.0.0-20190311183353-d8887717615a google.golang.org/grpc v1.27.1 ) diff --git a/example/grpc/middleware/tracing/tracing.go b/example/grpc/middleware/tracing/tracing.go deleted file mode 100644 index d6a17dff8..000000000 --- a/example/grpc/middleware/tracing/tracing.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tracing - -// gRPC tracing middleware -// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-rpc.md -import ( - "context" - - "go.opentelemetry.io/otel/plugin/grpctrace" - - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - - "go.opentelemetry.io/otel/api/core" - "go.opentelemetry.io/otel/api/correlation" - "go.opentelemetry.io/otel/api/global" - "go.opentelemetry.io/otel/api/key" - "go.opentelemetry.io/otel/api/trace" -) - -// UnaryServerInterceptor intercepts and extracts incoming trace data -func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - requestMetadata, _ := metadata.FromIncomingContext(ctx) - metadataCopy := requestMetadata.Copy() - - entries, spanCtx := grpctrace.Extract(ctx, &metadataCopy) - ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{ - MultiKV: entries, - })) - - grpcServerKey := key.New("grpc.server") - serverSpanAttrs := []core.KeyValue{ - grpcServerKey.String("hello-world-server"), - } - - tr := global.Tracer("example/grpc") - ctx, span := tr.Start( - trace.ContextWithRemoteSpanContext(ctx, spanCtx), - "hello-api-op", - trace.WithAttributes(serverSpanAttrs...), - trace.WithSpanKind(trace.SpanKindServer), - ) - defer span.End() - - return handler(ctx, req) -} - -// UnaryClientInterceptor intercepts and injects outgoing trace -func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - requestMetadata, _ := metadata.FromOutgoingContext(ctx) - metadataCopy := requestMetadata.Copy() - - tr := global.Tracer("example/grpc") - err := tr.WithSpan(ctx, "hello-api-op", - func(ctx context.Context) error { - grpctrace.Inject(ctx, &metadataCopy) - ctx = metadata.NewOutgoingContext(ctx, metadataCopy) - - err := invoker(ctx, method, req, reply, cc, opts...) - setTraceStatus(ctx, err) - return err - }) - return err -} - -func setTraceStatus(ctx context.Context, err error) { - if err != nil { - s, _ := status.FromError(err) - trace.SpanFromContext(ctx).SetStatus(s.Code(), s.Message()) - } -} diff --git a/example/grpc/server/main.go b/example/grpc/server/main.go index eb17e0a99..d23da5937 100644 --- a/example/grpc/server/main.go +++ b/example/grpc/server/main.go @@ -16,15 +16,18 @@ package main import ( "context" + "fmt" + "io" "log" "net" + "time" + "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/example/grpc/api" "go.opentelemetry.io/otel/example/grpc/config" + "go.opentelemetry.io/otel/plugin/grpctrace" "google.golang.org/grpc" - - "go.opentelemetry.io/otel/example/grpc/middleware/tracing" ) const ( @@ -33,15 +36,78 @@ const ( // server is used to implement api.HelloServiceServer type server struct { - api.UnimplementedHelloServiceServer + api.HelloServiceServer } // SayHello implements api.HelloServiceServer func (s *server) SayHello(ctx context.Context, in *api.HelloRequest) (*api.HelloResponse, error) { - log.Printf("Received: %v", in.GetGreeting()) + log.Printf("Received: %v\n", in.GetGreeting()) + time.Sleep(50 * time.Millisecond) + return &api.HelloResponse{Reply: "Hello " + in.Greeting}, nil } +func (s *server) SayHelloServerStream(in *api.HelloRequest, out api.HelloService_SayHelloServerStreamServer) error { + log.Printf("Received: %v\n", in.GetGreeting()) + + for i := 0; i < 5; i++ { + err := out.Send(&api.HelloResponse{Reply: "Hello " + in.Greeting}) + if err != nil { + return err + } + + time.Sleep(time.Duration(i*50) * time.Millisecond) + } + + return nil +} + +func (s *server) SayHelloClientStream(stream api.HelloService_SayHelloClientStreamServer) error { + i := 0 + + for { + in, err := stream.Recv() + + if err == io.EOF { + break + } else if err != nil { + log.Printf("Non EOF error: %v\n", err) + return err + } + + log.Printf("Received: %v\n", in.GetGreeting()) + i++ + } + + time.Sleep(50 * time.Millisecond) + + return stream.SendAndClose(&api.HelloResponse{Reply: fmt.Sprintf("Hello (%v times)", i)}) +} + +func (s *server) SayHelloBidiStream(stream api.HelloService_SayHelloBidiStreamServer) error { + for { + in, err := stream.Recv() + + if err == io.EOF { + break + } else if err != nil { + log.Printf("Non EOF error: %v\n", err) + return err + } + + time.Sleep(50 * time.Millisecond) + + log.Printf("Received: %v\n", in.GetGreeting()) + err = stream.Send(&api.HelloResponse{Reply: "Hello " + in.Greeting}) + + if err != nil { + return err + } + } + + return nil +} + func main() { config.Init() @@ -50,7 +116,10 @@ func main() { log.Fatalf("failed to listen: %v", err) } - s := grpc.NewServer(grpc.UnaryInterceptor(tracing.UnaryServerInterceptor)) + s := grpc.NewServer( + grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor(global.Tracer(""))), + grpc.StreamInterceptor(grpctrace.StreamServerInterceptor(global.Tracer(""))), + ) api.RegisterHelloServiceServer(s, &server{}) if err := s.Serve(lis); err != nil { diff --git a/go.mod b/go.mod index b2d3c186e..6e0f86d99 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 github.com/benbjohnson/clock v1.0.0 github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.3.2 github.com/google/go-cmp v0.4.0 github.com/google/gofuzz v1.0.0 // indirect github.com/kr/pretty v0.1.0 // indirect diff --git a/go.sum b/go.sum index ff2203b12..613963bb6 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,7 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -52,7 +53,9 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/plugin/grpctrace/interceptor.go b/plugin/grpctrace/interceptor.go new file mode 100644 index 000000000..344dccfd0 --- /dev/null +++ b/plugin/grpctrace/interceptor.go @@ -0,0 +1,461 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpctrace + +// gRPC tracing middleware +// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/rpc.md +import ( + "context" + "io" + "net" + "regexp" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/correlation" + "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/api/trace" +) + +var ( + rpcServiceKey = key.New("rpc.service") + netPeerIPKey = key.New("net.peer.ip") + netPeerPortKey = key.New("net.peer.port") + + messageTypeKey = key.New("message.type") + messageIDKey = key.New("message.id") + messageUncompressedSizeKey = key.New("message.uncompressed_size") +) + +const ( + messageTypeSent = "SENT" + messageTypeReceived = "RECEIVED" +) + +// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable +// for use in a grpc.Dial call. +// +// For example: +// tracer := global.Tracer("client-tracer") +// s := grpc.NewServer( +// grpc.WithUnaryInterceptor(grpctrace.UnaryClientInterceptor(tracer)), +// ..., // (existing DialOptions)) +func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + requestMetadata, _ := metadata.FromOutgoingContext(ctx) + metadataCopy := requestMetadata.Copy() + + var span trace.Span + ctx, span = tracer.Start( + ctx, method, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(peerInfoFromTarget(cc.Target())...), + trace.WithAttributes(rpcServiceKey.String(serviceFromFullMethod(method))), + ) + defer span.End() + + Inject(ctx, &metadataCopy) + ctx = metadata.NewOutgoingContext(ctx, metadataCopy) + + addEventForMessageSent(ctx, 1, req) + + err := invoker(ctx, method, req, reply, cc, opts...) + + addEventForMessageReceived(ctx, 1, reply) + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(s.Code(), s.Message()) + } + + return err + } +} + +type streamEventType int + +type streamEvent struct { + Type streamEventType + Err error +} + +const ( + closeEvent streamEventType = iota + receiveEndEvent + errorEvent +) + +// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and +// SendMsg method call. +type clientStream struct { + grpc.ClientStream + + desc *grpc.StreamDesc + events chan streamEvent + finished chan error + + receivedMessageID int + sentMessageID int +} + +var _ = proto.Marshal + +func (w *clientStream) RecvMsg(m interface{}) error { + err := w.ClientStream.RecvMsg(m) + + if err == nil && !w.desc.ServerStreams { + w.events <- streamEvent{receiveEndEvent, nil} + } else if err == io.EOF { + w.events <- streamEvent{receiveEndEvent, nil} + } else if err != nil { + w.events <- streamEvent{errorEvent, err} + } else { + w.receivedMessageID++ + addEventForMessageReceived(w.Context(), w.receivedMessageID, m) + } + + return err +} + +func (w *clientStream) SendMsg(m interface{}) error { + err := w.ClientStream.SendMsg(m) + + w.sentMessageID++ + addEventForMessageSent(w.Context(), w.sentMessageID, m) + + if err != nil { + w.events <- streamEvent{errorEvent, err} + } + + return err +} + +func (w *clientStream) Header() (metadata.MD, error) { + md, err := w.ClientStream.Header() + + if err != nil { + w.events <- streamEvent{errorEvent, err} + } + + return md, err +} + +func (w *clientStream) CloseSend() error { + err := w.ClientStream.CloseSend() + + if err != nil { + w.events <- streamEvent{errorEvent, err} + } else { + w.events <- streamEvent{closeEvent, nil} + } + + return err +} + +const ( + clientClosedState byte = 1 << iota + receiveEndedState +) + +func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { + events := make(chan streamEvent, 1) + finished := make(chan error) + + go func() { + // Both streams have to be closed + state := byte(0) + + for event := range events { + switch event.Type { + case closeEvent: + state |= clientClosedState + case receiveEndEvent: + state |= receiveEndedState + case errorEvent: + finished <- event.Err + close(events) + } + + if state == clientClosedState|receiveEndedState { + finished <- nil + close(events) + } + } + }() + + return &clientStream{ + ClientStream: s, + desc: desc, + events: events, + finished: finished, + } +} + +// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable +// for use in a grpc.Dial call. +// +// For example: +// tracer := global.Tracer("client-tracer") +// s := grpc.Dial( +// grpc.WithStreamInterceptor(grpctrace.StreamClientInterceptor(tracer)), +// ..., // (existing DialOptions)) +func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor { + return func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + requestMetadata, _ := metadata.FromOutgoingContext(ctx) + metadataCopy := requestMetadata.Copy() + + var span trace.Span + ctx, span = tracer.Start( + ctx, method, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(peerInfoFromTarget(cc.Target())...), + trace.WithAttributes(rpcServiceKey.String(serviceFromFullMethod(method))), + ) + + Inject(ctx, &metadataCopy) + ctx = metadata.NewOutgoingContext(ctx, metadataCopy) + + s, err := streamer(ctx, desc, cc, method, opts...) + stream := wrapClientStream(s, desc) + + go func() { + if err == nil { + err = <-stream.finished + } + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(s.Code(), s.Message()) + } + + span.End() + }() + + return stream, err + } +} + +// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable +// for use in a grpc.NewServer call. +// +// For example: +// tracer := global.Tracer("client-tracer") +// s := grpc.Dial( +// grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor(tracer)), +// ..., // (existing ServerOptions)) +func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor { + return func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (interface{}, error) { + requestMetadata, _ := metadata.FromIncomingContext(ctx) + metadataCopy := requestMetadata.Copy() + + entries, spanCtx := Extract(ctx, &metadataCopy) + ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{ + MultiKV: entries, + })) + + ctx, span := tracer.Start( + trace.ContextWithRemoteSpanContext(ctx, spanCtx), + info.FullMethod, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(peerInfoFromContext(ctx)...), + trace.WithAttributes(rpcServiceKey.String(serviceFromFullMethod(info.FullMethod))), + ) + defer span.End() + + addEventForMessageReceived(ctx, 1, req) + + resp, err := handler(ctx, req) + + addEventForMessageSent(ctx, 1, resp) + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(s.Code(), s.Message()) + } + + return resp, err + } +} + +// clientStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and +// SendMsg method call. +type serverStream struct { + grpc.ServerStream + ctx context.Context + + receivedMessageID int + sentMessageID int +} + +func (w *serverStream) Context() context.Context { + return w.ctx +} + +func (w *serverStream) RecvMsg(m interface{}) error { + err := w.ServerStream.RecvMsg(m) + + if err == nil { + w.receivedMessageID++ + addEventForMessageReceived(w.Context(), w.receivedMessageID, m) + } + + return err +} + +func (w *serverStream) SendMsg(m interface{}) error { + err := w.ServerStream.SendMsg(m) + + w.sentMessageID++ + addEventForMessageSent(w.Context(), w.sentMessageID, m) + + return err +} + +func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream { + return &serverStream{ + ServerStream: ss, + ctx: ctx, + } +} + +// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable +// for use in a grpc.NewServer call. +// +// For example: +// tracer := global.Tracer("client-tracer") +// s := grpc.Dial( +// grpc.StreamInterceptor(grpctrace.StreamServerInterceptor(tracer)), +// ..., // (existing ServerOptions)) +func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor { + return func( + srv interface{}, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + ctx := ss.Context() + + requestMetadata, _ := metadata.FromIncomingContext(ctx) + metadataCopy := requestMetadata.Copy() + + entries, spanCtx := Extract(ctx, &metadataCopy) + ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{ + MultiKV: entries, + })) + + ctx, span := tracer.Start( + trace.ContextWithRemoteSpanContext(ctx, spanCtx), + info.FullMethod, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(peerInfoFromContext(ctx)...), + trace.WithAttributes(rpcServiceKey.String(serviceFromFullMethod(info.FullMethod))), + ) + defer span.End() + + err := handler(srv, wrapServerStream(ctx, ss)) + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(s.Code(), s.Message()) + } + + return err + } +} + +func peerInfoFromTarget(target string) []core.KeyValue { + host, port, err := net.SplitHostPort(target) + + if err != nil { + return []core.KeyValue{} + } + + if host == "" { + host = "127.0.0.1" + } + + return []core.KeyValue{ + netPeerIPKey.String(host), + netPeerPortKey.String(port), + } +} + +func peerInfoFromContext(ctx context.Context) []core.KeyValue { + p, ok := peer.FromContext(ctx) + + if !ok { + return []core.KeyValue{} + } + + return peerInfoFromTarget(p.Addr.String()) +} + +var fullMethodRegexp = regexp.MustCompile(`^/\S*\.(\S*)/\S*$`) + +func serviceFromFullMethod(method string) string { + match := fullMethodRegexp.FindAllStringSubmatch(method, 1) + + if len(match) != 1 && len(match[1]) != 2 { + return "" + } + + return match[0][1] +} + +func addEventForMessageReceived(ctx context.Context, id int, m interface{}) { + size := proto.Size(m.(proto.Message)) + + span := trace.SpanFromContext(ctx) + span.AddEvent(ctx, "message", + messageTypeKey.String(messageTypeReceived), + messageIDKey.Int(id), + messageUncompressedSizeKey.Int(size), + ) +} + +func addEventForMessageSent(ctx context.Context, id int, m interface{}) { + size := proto.Size(m.(proto.Message)) + + span := trace.SpanFromContext(ctx) + span.AddEvent(ctx, "message", + messageTypeKey.String(messageTypeSent), + messageIDKey.Int(id), + messageUncompressedSizeKey.Int(size), + ) +}