mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-18 03:22:12 +02:00
Merge branch 'master' into jmacd/hist_search
This commit is contained in:
commit
afd79fbd6f
@ -156,14 +156,11 @@ The next step is to create the TraceProvider:
|
||||
```go
|
||||
tp, err := sdktrace.NewProvider(
|
||||
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
|
||||
sdktrace.WithResourceAttributes(
|
||||
sdktrace.WithResource(resource.New(
|
||||
// the service name used to display traces in Jaeger
|
||||
core.Key(conventions.AttributeServiceName).String("test-service"),
|
||||
),
|
||||
sdktrace.WithBatcher(exp, // add following two options to ensure flush
|
||||
sdktrace.WithScheduleDelayMillis(5),
|
||||
sdktrace.WithMaxExportBatchSize(2),
|
||||
))
|
||||
kv.Key(conventions.AttributeServiceName).String("test-service"),
|
||||
)),
|
||||
sdktrace.WithSyncer(exp))
|
||||
if err != nil {
|
||||
log.Fatalf("error creating trace provider: %v\n", err)
|
||||
}
|
||||
@ -175,7 +172,7 @@ After this, you can simply start sending traces:
|
||||
```go
|
||||
tracer := tp.Tracer("test-tracer")
|
||||
ctx, span := tracer.Start(context.Background(), "CollectorExporter-Example")
|
||||
defer span.End()
|
||||
defer span.End()
|
||||
```
|
||||
|
||||
The traces should now be visible from the Jaeger UI (if you have it installed), or thorough the jaeger-query service, under the name `test-service`.
|
||||
|
@ -414,7 +414,6 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
|
||||
github.com/golang/protobuf v1.3.4 h1:87PNWwrRvUSnqS4dlcBU/ftvOIBep4sYuBLlh6rX2wk=
|
||||
github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
|
||||
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
|
||||
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
|
||||
|
@ -55,10 +55,7 @@ func main() {
|
||||
// the service name used to display traces in Jaeger
|
||||
kv.Key(conventions.AttributeServiceName).String("test-service"),
|
||||
)),
|
||||
sdktrace.WithBatcher(exp, // add following two options to ensure flush
|
||||
sdktrace.WithBatchTimeout(5),
|
||||
sdktrace.WithMaxExportBatchSize(2),
|
||||
))
|
||||
sdktrace.WithSyncer(exp))
|
||||
if err != nil {
|
||||
log.Fatalf("error creating trace provider: %v\n", err)
|
||||
}
|
||||
@ -75,6 +72,4 @@ func main() {
|
||||
}
|
||||
|
||||
span.End()
|
||||
// Wait 1 second before ending
|
||||
<-time.After(time.Second)
|
||||
}
|
||||
|
@ -133,9 +133,10 @@ const (
|
||||
type clientStream struct {
|
||||
grpc.ClientStream
|
||||
|
||||
desc *grpc.StreamDesc
|
||||
events chan streamEvent
|
||||
finished chan error
|
||||
desc *grpc.StreamDesc
|
||||
events chan streamEvent
|
||||
eventsDone chan struct{}
|
||||
finished chan error
|
||||
|
||||
receivedMessageID int
|
||||
sentMessageID int
|
||||
@ -147,11 +148,11 @@ func (w *clientStream) RecvMsg(m interface{}) error {
|
||||
err := w.ClientStream.RecvMsg(m)
|
||||
|
||||
if err == nil && !w.desc.ServerStreams {
|
||||
w.events <- streamEvent{receiveEndEvent, nil}
|
||||
w.sendStreamEvent(receiveEndEvent, nil)
|
||||
} else if err == io.EOF {
|
||||
w.events <- streamEvent{receiveEndEvent, nil}
|
||||
w.sendStreamEvent(receiveEndEvent, nil)
|
||||
} else if err != nil {
|
||||
w.events <- streamEvent{errorEvent, err}
|
||||
w.sendStreamEvent(errorEvent, err)
|
||||
} else {
|
||||
w.receivedMessageID++
|
||||
messageReceived.Event(w.Context(), w.receivedMessageID, m)
|
||||
@ -167,7 +168,7 @@ func (w *clientStream) SendMsg(m interface{}) error {
|
||||
messageSent.Event(w.Context(), w.sentMessageID, m)
|
||||
|
||||
if err != nil {
|
||||
w.events <- streamEvent{errorEvent, err}
|
||||
w.sendStreamEvent(errorEvent, err)
|
||||
}
|
||||
|
||||
return err
|
||||
@ -177,7 +178,7 @@ func (w *clientStream) Header() (metadata.MD, error) {
|
||||
md, err := w.ClientStream.Header()
|
||||
|
||||
if err != nil {
|
||||
w.events <- streamEvent{errorEvent, err}
|
||||
w.sendStreamEvent(errorEvent, err)
|
||||
}
|
||||
|
||||
return md, err
|
||||
@ -187,9 +188,9 @@ func (w *clientStream) CloseSend() error {
|
||||
err := w.ClientStream.CloseSend()
|
||||
|
||||
if err != nil {
|
||||
w.events <- streamEvent{errorEvent, err}
|
||||
w.sendStreamEvent(errorEvent, err)
|
||||
} else {
|
||||
w.events <- streamEvent{closeEvent, nil}
|
||||
w.sendStreamEvent(closeEvent, nil)
|
||||
}
|
||||
|
||||
return err
|
||||
@ -201,10 +202,13 @@ const (
|
||||
)
|
||||
|
||||
func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
|
||||
events := make(chan streamEvent, 1)
|
||||
events := make(chan streamEvent)
|
||||
eventsDone := make(chan struct{})
|
||||
finished := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer close(eventsDone)
|
||||
|
||||
// Both streams have to be closed
|
||||
state := byte(0)
|
||||
|
||||
@ -216,12 +220,12 @@ func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream
|
||||
state |= receiveEndedState
|
||||
case errorEvent:
|
||||
finished <- event.Err
|
||||
close(events)
|
||||
return
|
||||
}
|
||||
|
||||
if state == clientClosedState|receiveEndedState {
|
||||
finished <- nil
|
||||
close(events)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -230,10 +234,18 @@ func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream
|
||||
ClientStream: s,
|
||||
desc: desc,
|
||||
events: events,
|
||||
eventsDone: eventsDone,
|
||||
finished: finished,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
|
||||
select {
|
||||
case <-w.eventsDone:
|
||||
case w.events <- streamEvent{Type: eventType, Err: err}:
|
||||
}
|
||||
}
|
||||
|
||||
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
|
||||
// for use in a grpc.Dial call.
|
||||
//
|
||||
|
@ -376,6 +376,9 @@ func TestStreamClientInterceptor(t *testing.T) {
|
||||
validate("SENT", events[i].Attributes)
|
||||
validate("RECEIVED", events[i+1].Attributes)
|
||||
}
|
||||
|
||||
// ensure CloseSend can be subsequently called
|
||||
_ = streamClient.CloseSend()
|
||||
}
|
||||
|
||||
func TestServerInterceptorError(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user