From f9bf364f23da5cfca858c3dc48ec752c0a065ac5 Mon Sep 17 00:00:00 2001 From: Dave McGregor Date: Wed, 20 May 2020 18:07:53 -0400 Subject: [PATCH 1/2] Ensure gRPC ClientStream override methods do not panic. Previously, the channel used to aggregate the finished state of the stream could be closed while still open to receiving stream state events. This removes the closing of the channel, and instead adds a "done" channel that is used to skip sending to the channel after the receiver is done. --- plugin/grpctrace/interceptor.go | 38 ++++++++++++++++++---------- plugin/grpctrace/interceptor_test.go | 3 +++ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/plugin/grpctrace/interceptor.go b/plugin/grpctrace/interceptor.go index 0981954c4..6c8b6315e 100644 --- a/plugin/grpctrace/interceptor.go +++ b/plugin/grpctrace/interceptor.go @@ -133,9 +133,10 @@ const ( type clientStream struct { grpc.ClientStream - desc *grpc.StreamDesc - events chan streamEvent - finished chan error + desc *grpc.StreamDesc + events chan streamEvent + eventsDone chan struct{} + finished chan error receivedMessageID int sentMessageID int @@ -147,11 +148,11 @@ func (w *clientStream) RecvMsg(m interface{}) error { err := w.ClientStream.RecvMsg(m) if err == nil && !w.desc.ServerStreams { - w.events <- streamEvent{receiveEndEvent, nil} + w.sendStreamEvent(receiveEndEvent, nil) } else if err == io.EOF { - w.events <- streamEvent{receiveEndEvent, nil} + w.sendStreamEvent(receiveEndEvent, nil) } else if err != nil { - w.events <- streamEvent{errorEvent, err} + w.sendStreamEvent(errorEvent, err) } else { w.receivedMessageID++ messageReceived.Event(w.Context(), w.receivedMessageID, m) @@ -167,7 +168,7 @@ func (w *clientStream) SendMsg(m interface{}) error { messageSent.Event(w.Context(), w.sentMessageID, m) if err != nil { - w.events <- streamEvent{errorEvent, err} + w.sendStreamEvent(errorEvent, err) } return err @@ -177,7 +178,7 @@ func (w *clientStream) Header() (metadata.MD, error) { md, err := w.ClientStream.Header() if err != nil { - w.events <- streamEvent{errorEvent, err} + w.sendStreamEvent(errorEvent, err) } return md, err @@ -187,9 +188,9 @@ func (w *clientStream) CloseSend() error { err := w.ClientStream.CloseSend() if err != nil { - w.events <- streamEvent{errorEvent, err} + w.sendStreamEvent(errorEvent, err) } else { - w.events <- streamEvent{closeEvent, nil} + w.sendStreamEvent(closeEvent, nil) } return err @@ -201,10 +202,13 @@ const ( ) func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { - events := make(chan streamEvent, 1) + events := make(chan streamEvent) + eventsDone := make(chan struct{}) finished := make(chan error) go func() { + defer close(eventsDone) + // Both streams have to be closed state := byte(0) @@ -216,12 +220,12 @@ func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream state |= receiveEndedState case errorEvent: finished <- event.Err - close(events) + return } if state == clientClosedState|receiveEndedState { finished <- nil - close(events) + return } } }() @@ -230,10 +234,18 @@ func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream ClientStream: s, desc: desc, events: events, + eventsDone: eventsDone, finished: finished, } } +func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { + select { + case <-w.eventsDone: + case w.events <- streamEvent{Type: eventType, Err: err}: + } +} + // StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable // for use in a grpc.Dial call. // diff --git a/plugin/grpctrace/interceptor_test.go b/plugin/grpctrace/interceptor_test.go index 211a9c36e..db12a3a30 100644 --- a/plugin/grpctrace/interceptor_test.go +++ b/plugin/grpctrace/interceptor_test.go @@ -376,6 +376,9 @@ func TestStreamClientInterceptor(t *testing.T) { validate("SENT", events[i].Attributes) validate("RECEIVED", events[i+1].Attributes) } + + // ensure CloseSend can be subsequently called + _ = streamClient.CloseSend() } func TestServerInterceptorError(t *testing.T) { From 7329ccc8231bb8412dfe0508c0918e1a17c61ec0 Mon Sep 17 00:00:00 2001 From: Stefan Prisca Date: Thu, 21 May 2020 20:40:11 +0200 Subject: [PATCH 2/2] Change OTLP example to use syncer (#756) * change otlp-example to use syncer * precommit Co-authored-by: Joshua MacDonald --- example/otel-collector/README.md | 13 +++++-------- example/otel-collector/go.sum | 1 - example/otel-collector/main.go | 7 +------ 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/example/otel-collector/README.md b/example/otel-collector/README.md index 67c4857ce..382d41ae7 100644 --- a/example/otel-collector/README.md +++ b/example/otel-collector/README.md @@ -156,14 +156,11 @@ The next step is to create the TraceProvider: ```go tp, err := sdktrace.NewProvider( sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithResourceAttributes( + sdktrace.WithResource(resource.New( // the service name used to display traces in Jaeger - core.Key(conventions.AttributeServiceName).String("test-service"), - ), - sdktrace.WithBatcher(exp, // add following two options to ensure flush - sdktrace.WithScheduleDelayMillis(5), - sdktrace.WithMaxExportBatchSize(2), - )) + kv.Key(conventions.AttributeServiceName).String("test-service"), + )), + sdktrace.WithSyncer(exp)) if err != nil { log.Fatalf("error creating trace provider: %v\n", err) } @@ -175,7 +172,7 @@ After this, you can simply start sending traces: ```go tracer := tp.Tracer("test-tracer") ctx, span := tracer.Start(context.Background(), "CollectorExporter-Example") - defer span.End() +defer span.End() ``` The traces should now be visible from the Jaeger UI (if you have it installed), or thorough the jaeger-query service, under the name `test-service`. diff --git a/example/otel-collector/go.sum b/example/otel-collector/go.sum index 683a554df..7f08f906d 100644 --- a/example/otel-collector/go.sum +++ b/example/otel-collector/go.sum @@ -414,7 +414,6 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= -github.com/golang/protobuf v1.3.4 h1:87PNWwrRvUSnqS4dlcBU/ftvOIBep4sYuBLlh6rX2wk= github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index c16002f7e..446258162 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -55,10 +55,7 @@ func main() { // the service name used to display traces in Jaeger kv.Key(conventions.AttributeServiceName).String("test-service"), )), - sdktrace.WithBatcher(exp, // add following two options to ensure flush - sdktrace.WithBatchTimeout(5), - sdktrace.WithMaxExportBatchSize(2), - )) + sdktrace.WithSyncer(exp)) if err != nil { log.Fatalf("error creating trace provider: %v\n", err) } @@ -75,6 +72,4 @@ func main() { } span.End() - // Wait 1 second before ending - <-time.After(time.Second) }