mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-26 03:52:03 +02:00
Merge branch 'master' into handler-test
This commit is contained in:
commit
a0a1abc666
@ -63,7 +63,7 @@ func initProvider() (*otlp.Exporter, *push.Controller) {
|
||||
|
||||
func main() {
|
||||
exp, pusher := initProvider()
|
||||
defer handleErr(exp.Stop(), "Failed to stop exporter")
|
||||
defer func() { handleErr(exp.Stop(), "Failed to stop exporter") }()
|
||||
defer pusher.Stop() // pushes any last exports to the receiver
|
||||
|
||||
tracer := global.Tracer("mage-sense")
|
||||
|
@ -92,7 +92,7 @@ Some compressors auto-register on import, such as gzip, which can be registered
|
||||
|
||||
### `WithHeaders(headers map[string]string)`
|
||||
|
||||
Headers to send when the gRPC stream connection is instantiated.
|
||||
Headers to send with gRPC requests.
|
||||
|
||||
### `WithTLSCredentials(creds "google.golang.org/grpc/credentials".TransportCredentials)`
|
||||
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
metadata "google.golang.org/grpc/metadata"
|
||||
|
||||
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
|
||||
coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
|
||||
@ -44,8 +45,13 @@ func makeMockCollector(t *testing.T) *mockCol {
|
||||
}
|
||||
|
||||
type mockTraceService struct {
|
||||
mu sync.RWMutex
|
||||
rsm map[string]*tracepb.ResourceSpans
|
||||
mu sync.RWMutex
|
||||
rsm map[string]*tracepb.ResourceSpans
|
||||
headers metadata.MD
|
||||
}
|
||||
|
||||
func (mts *mockTraceService) getHeaders() metadata.MD {
|
||||
return mts.headers
|
||||
}
|
||||
|
||||
func (mts *mockTraceService) getSpans() []*tracepb.Span {
|
||||
@ -70,6 +76,7 @@ func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans {
|
||||
|
||||
func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
|
||||
mts.mu.Lock()
|
||||
mts.headers, _ = metadata.FromIncomingContext(ctx)
|
||||
defer mts.mu.Unlock()
|
||||
rss := exp.GetResourceSpans()
|
||||
for _, rs := range rss {
|
||||
@ -192,6 +199,10 @@ func (mc *mockCol) getResourceSpans() []*tracepb.ResourceSpans {
|
||||
return mc.traceSvc.getResourceSpans()
|
||||
}
|
||||
|
||||
func (mc *mockCol) getHeaders() metadata.MD {
|
||||
return mc.traceSvc.getHeaders()
|
||||
}
|
||||
|
||||
func (mc *mockCol) getMetrics() []*metricpb.Metric {
|
||||
return mc.metricSvc.getMetrics()
|
||||
}
|
||||
|
@ -122,8 +122,7 @@ func WithCompressor(compressor string) ExporterOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithHeaders will send the provided headers when the gRPC stream connection
|
||||
// is instantiated.
|
||||
// WithHeaders will send the provided headers with gRPC requests
|
||||
func WithHeaders(headers map[string]string) ExporterOption {
|
||||
return func(cfg *Config) {
|
||||
cfg.headers = headers
|
||||
|
@ -50,7 +50,8 @@ type Exporter struct {
|
||||
|
||||
backgroundConnectionDoneCh chan bool
|
||||
|
||||
c Config
|
||||
c Config
|
||||
metadata metadata.MD
|
||||
}
|
||||
|
||||
var _ tracesdk.SpanBatcher = (*Exporter)(nil)
|
||||
@ -77,6 +78,9 @@ func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
|
||||
grpcServiceConfig: DefaultGRPCServiceConfig,
|
||||
}
|
||||
configureOptions(&e.c, opts...)
|
||||
if len(e.c.headers) > 0 {
|
||||
e.metadata = metadata.New(e.c.headers)
|
||||
}
|
||||
|
||||
// TODO (rghetia): add resources
|
||||
|
||||
@ -158,6 +162,13 @@ func (e *Exporter) enableConnections(cc *grpc.ClientConn) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context {
|
||||
if e.metadata.Len() > 0 {
|
||||
return metadata.NewOutgoingContext(ctx, e.metadata)
|
||||
}
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
|
||||
addr := e.prepareCollectorAddress()
|
||||
|
||||
@ -177,10 +188,7 @@ func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
|
||||
dialOpts = append(dialOpts, e.c.grpcDialOptions...)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if len(e.c.headers) > 0 {
|
||||
ctx = metadata.NewOutgoingContext(ctx, metadata.New(e.c.headers))
|
||||
}
|
||||
ctx := e.contextWithMetadata(context.Background())
|
||||
return grpc.DialContext(ctx, addr, dialOpts...)
|
||||
}
|
||||
|
||||
@ -246,7 +254,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
|
||||
return errContextCanceled
|
||||
default:
|
||||
e.senderMu.Lock()
|
||||
_, err := e.metricExporter.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
|
||||
_, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{
|
||||
ResourceMetrics: rms,
|
||||
})
|
||||
e.senderMu.Unlock()
|
||||
@ -281,7 +289,7 @@ func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) {
|
||||
}
|
||||
|
||||
e.senderMu.Lock()
|
||||
_, err := e.traceExporter.Export(ctx, &coltracepb.ExportTraceServiceRequest{
|
||||
_, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{
|
||||
ResourceSpans: protoSpans,
|
||||
})
|
||||
e.senderMu.Unlock()
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
|
||||
|
||||
@ -400,3 +401,26 @@ func TestNewExporter_withAddress(t *testing.T) {
|
||||
t.Fatalf("Unexpected Start error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewExporter_withHeaders(t *testing.T) {
|
||||
mc := runMockCol(t)
|
||||
defer func() {
|
||||
_ = mc.stop()
|
||||
}()
|
||||
|
||||
exp, _ := otlp.NewExporter(
|
||||
otlp.WithInsecure(),
|
||||
otlp.WithReconnectionPeriod(50*time.Millisecond),
|
||||
otlp.WithAddress(mc.address),
|
||||
otlp.WithHeaders(map[string]string{"header1": "value1"}),
|
||||
)
|
||||
exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}})
|
||||
|
||||
defer func() {
|
||||
_ = exp.Stop()
|
||||
}()
|
||||
|
||||
headers := mc.getHeaders()
|
||||
require.Len(t, headers.Get("header1"), 1)
|
||||
assert.Equal(t, "value1", headers.Get("header1")[0])
|
||||
}
|
||||
|
@ -105,9 +105,10 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio
|
||||
queue: make(chan *export.SpanData, o.MaxQueueSize),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
bsp.stopWait.Add(1)
|
||||
|
||||
bsp.stopWait.Add(1)
|
||||
go func() {
|
||||
defer bsp.stopWait.Done()
|
||||
bsp.processQueue()
|
||||
bsp.drainQueue()
|
||||
}()
|
||||
@ -130,8 +131,6 @@ func (bsp *BatchSpanProcessor) Shutdown() {
|
||||
bsp.stopOnce.Do(func() {
|
||||
close(bsp.stopCh)
|
||||
bsp.stopWait.Wait()
|
||||
close(bsp.queue)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
@ -173,7 +172,6 @@ func (bsp *BatchSpanProcessor) exportSpans() {
|
||||
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
|
||||
// waiting up to BatchTimeout to form a batch.
|
||||
func (bsp *BatchSpanProcessor) processQueue() {
|
||||
defer bsp.stopWait.Done()
|
||||
defer bsp.timer.Stop()
|
||||
|
||||
for {
|
||||
@ -197,13 +195,22 @@ func (bsp *BatchSpanProcessor) processQueue() {
|
||||
// drainQueue awaits the any caller that had added to bsp.stopWait
|
||||
// to finish the enqueue, then exports the final batch.
|
||||
func (bsp *BatchSpanProcessor) drainQueue() {
|
||||
for sd := range bsp.queue {
|
||||
bsp.batch = append(bsp.batch, sd)
|
||||
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
|
||||
bsp.exportSpans()
|
||||
for {
|
||||
select {
|
||||
case sd := <-bsp.queue:
|
||||
if sd == nil {
|
||||
bsp.exportSpans()
|
||||
return
|
||||
}
|
||||
|
||||
bsp.batch = append(bsp.batch, sd)
|
||||
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
|
||||
bsp.exportSpans()
|
||||
}
|
||||
default:
|
||||
close(bsp.queue)
|
||||
}
|
||||
}
|
||||
bsp.exportSpans()
|
||||
}
|
||||
|
||||
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
|
||||
@ -226,17 +233,19 @@ func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
|
||||
panic(x)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-bsp.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if bsp.o.BlockOnQueueFull {
|
||||
select {
|
||||
case bsp.queue <- sd:
|
||||
case <-bsp.stopCh:
|
||||
}
|
||||
bsp.queue <- sd
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case bsp.queue <- sd:
|
||||
case <-bsp.stopCh:
|
||||
default:
|
||||
atomic.AddUint32(&bsp.dropped, 1)
|
||||
}
|
||||
|
@ -117,7 +117,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
|
||||
sdktrace.WithBatchTimeout(schDelay),
|
||||
sdktrace.WithMaxQueueSize(200),
|
||||
sdktrace.WithMaxExportBatchSize(20),
|
||||
sdktrace.WithBlocking(),
|
||||
},
|
||||
wantNumSpans: 205,
|
||||
wantBatchCount: 11,
|
||||
@ -139,7 +138,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
|
||||
o: []sdktrace.BatchSpanProcessorOption{
|
||||
sdktrace.WithBatchTimeout(schDelay),
|
||||
sdktrace.WithMaxExportBatchSize(200),
|
||||
sdktrace.WithBlocking(),
|
||||
},
|
||||
wantNumSpans: 2000,
|
||||
wantBatchCount: 10,
|
||||
@ -162,18 +160,26 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
|
||||
|
||||
tp.UnregisterSpanProcessor(ssp)
|
||||
|
||||
// TODO(https://github.com/open-telemetry/opentelemetry-go/issues/741)
|
||||
// Restore some sort of test here.
|
||||
_ = option.wantNumSpans
|
||||
_ = option.wantBatchCount
|
||||
_ = te.len() // gotNumOfSpans
|
||||
_ = te.getBatchCount() // gotBatchCount
|
||||
gotNumOfSpans := te.len()
|
||||
if option.wantNumSpans != gotNumOfSpans {
|
||||
t.Errorf("number of exported span: got %+v, want %+v\n",
|
||||
gotNumOfSpans, option.wantNumSpans)
|
||||
}
|
||||
|
||||
gotBatchCount := te.getBatchCount()
|
||||
if gotBatchCount < option.wantBatchCount {
|
||||
t.Errorf("number batches: got %+v, want >= %+v\n",
|
||||
gotBatchCount, option.wantBatchCount)
|
||||
t.Errorf("Batches %v\n", te.sizes)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func createAndRegisterBatchSP(t *testing.T, option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor {
|
||||
ssp, err := sdktrace.NewBatchSpanProcessor(te, option.o...)
|
||||
// Always use blocking queue to avoid flaky tests.
|
||||
options := append(option.o, sdktrace.WithBlocking())
|
||||
ssp, err := sdktrace.NewBatchSpanProcessor(te, options...)
|
||||
if ssp == nil {
|
||||
t.Errorf("%s: Error creating new instance of BatchSpanProcessor, error: %v\n", option.name, err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user