You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
add resource option to Provider. (#545)
- update otlp exporter to export resources.
This commit is contained in:
@@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -26,42 +27,80 @@ import (
|
||||
|
||||
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
|
||||
coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
|
||||
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
|
||||
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
|
||||
resourcepb "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
|
||||
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
|
||||
)
|
||||
|
||||
func makeMockCollector(t *testing.T) *mockCol {
|
||||
return &mockCol{
|
||||
t: t,
|
||||
traceSvc: &mockTraceService{},
|
||||
t: t,
|
||||
traceSvc: &mockTraceService{
|
||||
rsm: map[string]*tracepb.ResourceSpans{},
|
||||
},
|
||||
metricSvc: &mockMetricService{},
|
||||
}
|
||||
}
|
||||
|
||||
type mockTraceService struct {
|
||||
mu sync.RWMutex
|
||||
spans []*tracepb.Span
|
||||
mu sync.RWMutex
|
||||
rsm map[string]*tracepb.ResourceSpans
|
||||
}
|
||||
|
||||
func (mts *mockTraceService) getSpans() []*tracepb.Span {
|
||||
mts.mu.RLock()
|
||||
spans := append([]*tracepb.Span{}, mts.spans...)
|
||||
mts.mu.RUnlock()
|
||||
|
||||
defer mts.mu.RUnlock()
|
||||
spans := []*tracepb.Span{}
|
||||
for _, rs := range mts.rsm {
|
||||
spans = append(spans, rs.Spans...)
|
||||
}
|
||||
return spans
|
||||
}
|
||||
|
||||
func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
|
||||
resourceSpans := exp.GetResourceSpans()
|
||||
// TODO (rghetia): handle Resources
|
||||
mts.mu.Lock()
|
||||
for _, rs := range resourceSpans {
|
||||
mts.spans = append(mts.spans, rs.Spans...)
|
||||
func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans {
|
||||
mts.mu.RLock()
|
||||
defer mts.mu.RUnlock()
|
||||
rss := make([]*tracepb.ResourceSpans, 0, len(mts.rsm))
|
||||
for _, rs := range mts.rsm {
|
||||
rss = append(rss, rs)
|
||||
}
|
||||
return rss
|
||||
}
|
||||
|
||||
func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
|
||||
mts.mu.Lock()
|
||||
defer mts.mu.Unlock()
|
||||
rss := exp.GetResourceSpans()
|
||||
for _, rs := range rss {
|
||||
rstr := resourceString(rs.Resource)
|
||||
existingRs, ok := mts.rsm[rstr]
|
||||
if !ok {
|
||||
mts.rsm[rstr] = rs
|
||||
} else {
|
||||
existingRs.Spans = append(existingRs.Spans, rs.GetSpans()...)
|
||||
}
|
||||
}
|
||||
mts.mu.Unlock()
|
||||
return &coltracepb.ExportTraceServiceResponse{}, nil
|
||||
}
|
||||
|
||||
func resourceString(res *resourcepb.Resource) string {
|
||||
sAttrs := sortedAttributes(res.GetAttributes())
|
||||
rstr := ""
|
||||
for _, attr := range sAttrs {
|
||||
rstr = rstr + attr.String()
|
||||
|
||||
}
|
||||
return rstr
|
||||
}
|
||||
|
||||
func sortedAttributes(attrs []*commonpb.AttributeKeyValue) []*commonpb.AttributeKeyValue {
|
||||
sort.Slice(attrs[:], func(i, j int) bool {
|
||||
return attrs[i].Key < attrs[j].Key
|
||||
})
|
||||
return attrs
|
||||
}
|
||||
|
||||
type mockMetricService struct {
|
||||
mu sync.RWMutex
|
||||
metrics []*metricpb.Metric
|
||||
@@ -134,6 +173,10 @@ func (mc *mockCol) getSpans() []*tracepb.Span {
|
||||
return mc.traceSvc.getSpans()
|
||||
}
|
||||
|
||||
func (mc *mockCol) getResourceSpans() []*tracepb.ResourceSpans {
|
||||
return mc.traceSvc.getResourceSpans()
|
||||
}
|
||||
|
||||
func (mc *mockCol) getMetrics() []*metricpb.Metric {
|
||||
return mc.metricSvc.getMetrics()
|
||||
}
|
||||
|
||||
+17
-7
@@ -23,6 +23,8 @@ import (
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
@@ -346,18 +348,26 @@ func otSpanDataToPbSpans(sdl []*tracesdk.SpanData) []*tracepb.ResourceSpans {
|
||||
if len(sdl) == 0 {
|
||||
return nil
|
||||
}
|
||||
protoSpans := make([]*tracepb.Span, 0, len(sdl))
|
||||
rsm := make(map[*resource.Resource]*tracepb.ResourceSpans)
|
||||
|
||||
for _, sd := range sdl {
|
||||
if sd != nil {
|
||||
protoSpans = append(protoSpans, otSpanToProtoSpan(sd))
|
||||
rs, ok := rsm[sd.Resource]
|
||||
if !ok {
|
||||
rs = &tracepb.ResourceSpans{
|
||||
Resource: otResourceToProtoResource(sd.Resource),
|
||||
Spans: []*tracepb.Span{},
|
||||
}
|
||||
rsm[sd.Resource] = rs
|
||||
}
|
||||
rs.Spans = append(rs.Spans, otSpanToProtoSpan(sd))
|
||||
}
|
||||
}
|
||||
return []*tracepb.ResourceSpans{
|
||||
{
|
||||
Resource: nil,
|
||||
Spans: protoSpans,
|
||||
},
|
||||
rss := make([]*tracepb.ResourceSpans, 0, len(rsm))
|
||||
for _, rs := range rsm {
|
||||
rss = append(rss, rs)
|
||||
}
|
||||
return rss
|
||||
}
|
||||
|
||||
func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) {
|
||||
|
||||
+41
-15
@@ -78,21 +78,33 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
|
||||
_ = exp.Stop()
|
||||
}()
|
||||
|
||||
tp, err := sdktrace.NewProvider(
|
||||
pOpts := []sdktrace.ProviderOption{
|
||||
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
|
||||
sdktrace.WithBatcher(exp, // add following two options to ensure flush
|
||||
sdktrace.WithScheduleDelayMillis(15),
|
||||
sdktrace.WithMaxExportBatchSize(10),
|
||||
))
|
||||
),
|
||||
}
|
||||
tp1, err := sdktrace.NewProvider(append(pOpts,
|
||||
sdktrace.WithResourceAttributes(core.Key("rk1").String("rv11)"),
|
||||
core.Key("rk2").Int64(5)))...)
|
||||
assert.NoError(t, err)
|
||||
|
||||
//global.SetTraceProvider(tp)
|
||||
tp2, err := sdktrace.NewProvider(append(pOpts,
|
||||
sdktrace.WithResourceAttributes(core.Key("rk1").String("rv12)"),
|
||||
core.Key("rk3").Float32(6.5)))...)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tr := tp.Tracer("test-tracer")
|
||||
tr1 := tp1.Tracer("test-tracer1")
|
||||
tr2 := tp2.Tracer("test-tracer2")
|
||||
// Now create few spans
|
||||
m := 4
|
||||
for i := 0; i < m; i++ {
|
||||
_, span := tr.Start(context.Background(), "AlwaysSample")
|
||||
_, span := tr1.Start(context.Background(), "AlwaysSample")
|
||||
span.SetAttributes(core.Key("i").Int64(int64(i)))
|
||||
span.End()
|
||||
|
||||
_, span = tr2.Start(context.Background(), "AlwaysSample")
|
||||
span.SetAttributes(core.Key("i").Int64(int64(i)))
|
||||
span.End()
|
||||
}
|
||||
@@ -174,18 +186,32 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
|
||||
// verification checks of expected data back.
|
||||
_ = mc.stop()
|
||||
|
||||
spans := mc.getSpans()
|
||||
|
||||
// Now verify that we received all spans.
|
||||
if got, want := len(spans), m; got != want {
|
||||
t.Fatalf("span counts: got %d, want %d", got, want)
|
||||
// Now verify that we only got two resources
|
||||
rss := mc.getResourceSpans()
|
||||
if got, want := len(rss), 2; got != want {
|
||||
t.Fatalf("resource span count: got %d, want %d\n", got, want)
|
||||
}
|
||||
for i := 0; i < 4; i++ {
|
||||
if gotName, want := spans[i].Name, "AlwaysSample"; gotName != want {
|
||||
t.Fatalf("span name: got %s, want %s", gotName, want)
|
||||
|
||||
// Now verify spans and attributes for each resource span.
|
||||
for _, rs := range rss {
|
||||
if got, want := len(rs.Spans), m; got != want {
|
||||
t.Fatalf("span counts: got %d, want %d", got, want)
|
||||
}
|
||||
if got, want := spans[i].Attributes[0].IntValue, int64(i); got != want {
|
||||
t.Fatalf("span attribute value: got %d, want %d", got, want)
|
||||
attrMap := map[int64]bool{}
|
||||
for _, s := range rs.Spans {
|
||||
if gotName, want := s.Name, "AlwaysSample"; gotName != want {
|
||||
t.Fatalf("span name: got %s, want %s", gotName, want)
|
||||
}
|
||||
attrMap[s.Attributes[0].IntValue] = true
|
||||
}
|
||||
if got, want := len(attrMap), m; got != want {
|
||||
t.Fatalf("span attribute unique values: got %d want %d", got, want)
|
||||
}
|
||||
for i := 0; i < m; i++ {
|
||||
_, ok := attrMap[int64(i)]
|
||||
if !ok {
|
||||
t.Fatalf("span with attribute %d missing", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,10 @@ package otlp
|
||||
import (
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
|
||||
resourcepb "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
|
||||
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
@@ -29,6 +32,16 @@ const (
|
||||
maxMessageEventsPerSpan = 128
|
||||
)
|
||||
|
||||
func otResourceToProtoResource(res *resource.Resource) *resourcepb.Resource {
|
||||
if res == nil {
|
||||
return nil
|
||||
}
|
||||
resProto := &resourcepb.Resource{
|
||||
Attributes: otAttributesToProtoAttributes(res.Attributes()),
|
||||
}
|
||||
return resProto
|
||||
}
|
||||
|
||||
func otSpanToProtoSpan(sd *export.SpanData) *tracepb.Span {
|
||||
if sd == nil {
|
||||
return nil
|
||||
|
||||
@@ -19,11 +19,14 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
|
||||
resourcepb "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
|
||||
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
@@ -33,21 +36,21 @@ import (
|
||||
)
|
||||
|
||||
type testCases struct {
|
||||
otSpan *export.SpanData
|
||||
otlpSpan *tracepb.Span
|
||||
otSpan []*export.SpanData
|
||||
otlpResSpan *tracepb.ResourceSpans
|
||||
}
|
||||
|
||||
func TestOtSpanToOtlpSpan_Basic(t *testing.T) {
|
||||
// The goal of this test is to ensure that each
|
||||
// spanData is transformed and exported correctly!
|
||||
testAndVerify("Basic End-2-End", t, func(t *testing.T) []testCases {
|
||||
testAndVerify("Basic End-2-End", t, func(t *testing.T) testCases {
|
||||
|
||||
startTime := time.Now()
|
||||
endTime := startTime.Add(10 * time.Second)
|
||||
|
||||
tcs := []testCases{
|
||||
{
|
||||
otSpan: &export.SpanData{
|
||||
tc := testCases{
|
||||
otSpan: []*export.SpanData{
|
||||
{
|
||||
SpanContext: core.SpanContext{
|
||||
TraceID: core.TraceID{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F},
|
||||
SpanID: core.SpanID{0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8},
|
||||
@@ -100,99 +103,104 @@ func TestOtSpanToOtlpSpan_Basic(t *testing.T) {
|
||||
DroppedAttributeCount: 1,
|
||||
DroppedMessageEventCount: 2,
|
||||
DroppedLinkCount: 3,
|
||||
Resource: resource.New(core.Key("rk1").String("rv1"),
|
||||
core.Key("rk2").Int64(5)),
|
||||
},
|
||||
otlpSpan: &tracepb.Span{
|
||||
TraceId: []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F},
|
||||
SpanId: []byte{0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8},
|
||||
ParentSpanId: []byte{0xEF, 0xEE, 0xED, 0xEC, 0xEB, 0xEA, 0xE9, 0xE8},
|
||||
Name: "End-To-End Here",
|
||||
Kind: tracepb.Span_SERVER,
|
||||
StartTimeUnixnano: uint64(startTime.Nanosecond()),
|
||||
EndTimeUnixnano: uint64(endTime.Nanosecond()),
|
||||
Status: &tracepb.Status{
|
||||
Code: 13,
|
||||
Message: "utterly unrecognized",
|
||||
},
|
||||
Events: []*tracepb.Span_Event{
|
||||
{
|
||||
TimeUnixnano: uint64(startTime.Nanosecond()),
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "CompressedByteSize",
|
||||
Type: commonpb.AttributeKeyValue_INT,
|
||||
StringValue: "",
|
||||
IntValue: 512,
|
||||
DoubleValue: 0,
|
||||
BoolValue: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
TimeUnixnano: uint64(endTime.Nanosecond()),
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "MessageEventType",
|
||||
Type: commonpb.AttributeKeyValue_STRING,
|
||||
StringValue: "Recv",
|
||||
IntValue: 0,
|
||||
DoubleValue: 0,
|
||||
BoolValue: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Links: []*tracepb.Span_Link{
|
||||
{
|
||||
TraceId: []byte{0xC0, 0xC1, 0xC2, 0xC3, 0xC4, 0xC5, 0xC6, 0xC7, 0xC8, 0xC9, 0xCA, 0xCB, 0xCC, 0xCD, 0xCE, 0xCF},
|
||||
SpanId: []byte{0xB0, 0xB1, 0xB2, 0xB3, 0xB4, 0xB5, 0xB6, 0xB7},
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "LinkType",
|
||||
Type: commonpb.AttributeKeyValue_STRING,
|
||||
StringValue: "Parent",
|
||||
IntValue: 0,
|
||||
DoubleValue: 0,
|
||||
BoolValue: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
TraceId: []byte{0xE0, 0xE1, 0xE2, 0xE3, 0xE4, 0xE5, 0xE6, 0xE7, 0xE8, 0xE9, 0xEA, 0xEB, 0xEC, 0xED, 0xEE, 0xEF},
|
||||
SpanId: []byte{0xD0, 0xD1, 0xD2, 0xD3, 0xD4, 0xD5, 0xD6, 0xD7},
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "LinkType",
|
||||
Type: commonpb.AttributeKeyValue_STRING,
|
||||
StringValue: "Child",
|
||||
IntValue: 0,
|
||||
DoubleValue: 0,
|
||||
BoolValue: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
otlpResSpan: &tracepb.ResourceSpans{
|
||||
Spans: []*tracepb.Span{
|
||||
{
|
||||
TraceId: []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F},
|
||||
SpanId: []byte{0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8},
|
||||
ParentSpanId: []byte{0xEF, 0xEE, 0xED, 0xEC, 0xEB, 0xEA, 0xE9, 0xE8},
|
||||
Name: "End-To-End Here",
|
||||
Kind: tracepb.Span_SERVER,
|
||||
StartTimeUnixnano: uint64(startTime.Nanosecond()),
|
||||
EndTimeUnixnano: uint64(endTime.Nanosecond()),
|
||||
Status: &tracepb.Status{
|
||||
Code: 13,
|
||||
Message: "utterly unrecognized",
|
||||
},
|
||||
Events: []*tracepb.Span_Event{
|
||||
{
|
||||
TimeUnixnano: uint64(startTime.Nanosecond()),
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "CompressedByteSize",
|
||||
Type: commonpb.AttributeKeyValue_INT,
|
||||
IntValue: 512,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
TimeUnixnano: uint64(endTime.Nanosecond()),
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "MessageEventType",
|
||||
Type: commonpb.AttributeKeyValue_STRING,
|
||||
StringValue: "Recv",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Links: []*tracepb.Span_Link{
|
||||
{
|
||||
TraceId: []byte{0xC0, 0xC1, 0xC2, 0xC3, 0xC4, 0xC5, 0xC6, 0xC7, 0xC8, 0xC9, 0xCA, 0xCB, 0xCC, 0xCD, 0xCE, 0xCF},
|
||||
SpanId: []byte{0xB0, 0xB1, 0xB2, 0xB3, 0xB4, 0xB5, 0xB6, 0xB7},
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "LinkType",
|
||||
Type: commonpb.AttributeKeyValue_STRING,
|
||||
StringValue: "Parent",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
TraceId: []byte{0xE0, 0xE1, 0xE2, 0xE3, 0xE4, 0xE5, 0xE6, 0xE7, 0xE8, 0xE9, 0xEA, 0xEB, 0xEC, 0xED, 0xEE, 0xEF},
|
||||
SpanId: []byte{0xD0, 0xD1, 0xD2, 0xD3, 0xD4, 0xD5, 0xD6, 0xD7},
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "LinkType",
|
||||
Type: commonpb.AttributeKeyValue_STRING,
|
||||
StringValue: "Child",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "timeout_ns",
|
||||
Type: commonpb.AttributeKeyValue_INT,
|
||||
IntValue: 12e9,
|
||||
},
|
||||
},
|
||||
DroppedAttributesCount: 1,
|
||||
DroppedEventsCount: 2,
|
||||
DroppedLinksCount: 3,
|
||||
},
|
||||
},
|
||||
Resource: &resourcepb.Resource{
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "timeout_ns",
|
||||
Type: commonpb.AttributeKeyValue_INT,
|
||||
StringValue: "",
|
||||
IntValue: 12e9,
|
||||
DoubleValue: 0,
|
||||
BoolValue: false,
|
||||
Key: "rk1",
|
||||
Type: commonpb.AttributeKeyValue_STRING,
|
||||
StringValue: "rv1",
|
||||
},
|
||||
{
|
||||
Key: "rk2",
|
||||
Type: commonpb.AttributeKeyValue_INT,
|
||||
IntValue: 5,
|
||||
},
|
||||
},
|
||||
DroppedAttributesCount: 1,
|
||||
DroppedEventsCount: 2,
|
||||
DroppedLinksCount: 3,
|
||||
},
|
||||
},
|
||||
}
|
||||
return tcs
|
||||
return tc
|
||||
})
|
||||
}
|
||||
|
||||
func TestOtSpanToOtlpSpan_SpanKind(t *testing.T) {
|
||||
testAndVerify("Test SpanKind", t, func(t *testing.T) []testCases {
|
||||
testAndVerify("Test SpanKind", t, func(t *testing.T) testCases {
|
||||
kinds := []struct {
|
||||
in apitrace.SpanKind
|
||||
out tracepb.Span_SpanKind
|
||||
@@ -223,23 +231,28 @@ func TestOtSpanToOtlpSpan_SpanKind(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
tcs := make([]testCases, 0, len(kinds))
|
||||
otSpans := make([]*export.SpanData, 0, len(kinds))
|
||||
otlpSpans := make([]*tracepb.Span, 0, len(kinds))
|
||||
for _, kind := range kinds {
|
||||
otSpan, otlpSpan := getSpan()
|
||||
otSpan.SpanKind = kind.in
|
||||
otlpSpan.Kind = kind.out
|
||||
tc := testCases{
|
||||
otSpan: otSpan,
|
||||
otlpSpan: otlpSpan,
|
||||
}
|
||||
tcs = append(tcs, tc)
|
||||
otSpans = append(otSpans, otSpan)
|
||||
otlpSpans = append(otlpSpans, otlpSpan)
|
||||
}
|
||||
return tcs
|
||||
tc := testCases{
|
||||
otSpan: otSpans,
|
||||
otlpResSpan: &tracepb.ResourceSpans{
|
||||
Resource: nil,
|
||||
Spans: otlpSpans,
|
||||
},
|
||||
}
|
||||
return tc
|
||||
})
|
||||
}
|
||||
|
||||
func TestOtSpanToOtlpSpan_Attribute(t *testing.T) {
|
||||
testAndVerify("Test SpanAttribute", t, func(t *testing.T) []testCases {
|
||||
testAndVerify("Test SpanAttribute", t, func(t *testing.T) testCases {
|
||||
attrInt := &commonpb.AttributeKeyValue{
|
||||
Key: "commonInt",
|
||||
Type: commonpb.AttributeKeyValue_INT,
|
||||
@@ -316,18 +329,23 @@ func TestOtSpanToOtlpSpan_Attribute(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
tcs := make([]testCases, 0, len(kinds))
|
||||
otSpans := make([]*export.SpanData, 0, len(kinds))
|
||||
otlpSpans := make([]*tracepb.Span, 0, len(kinds))
|
||||
for _, kind := range kinds {
|
||||
otSpan, otlpSpan := getSpan()
|
||||
otSpan.Attributes = []core.KeyValue{kind.in}
|
||||
otlpSpan.Attributes = []*commonpb.AttributeKeyValue{kind.out}
|
||||
tc := testCases{
|
||||
otSpan: otSpan,
|
||||
otlpSpan: otlpSpan,
|
||||
}
|
||||
tcs = append(tcs, tc)
|
||||
otSpans = append(otSpans, otSpan)
|
||||
otlpSpans = append(otlpSpans, otlpSpan)
|
||||
}
|
||||
return tcs
|
||||
tc := testCases{
|
||||
otSpan: otSpans,
|
||||
otlpResSpan: &tracepb.ResourceSpans{
|
||||
Resource: nil,
|
||||
Spans: otlpSpans,
|
||||
},
|
||||
}
|
||||
return tc
|
||||
})
|
||||
}
|
||||
|
||||
@@ -362,7 +380,7 @@ func getSpan() (*export.SpanData, *tracepb.Span) {
|
||||
return otSpan, otlpSpan
|
||||
}
|
||||
|
||||
func testAndVerify(name string, t *testing.T, f func(t *testing.T) []testCases) {
|
||||
func testAndVerify(name string, t *testing.T, f func(t *testing.T) testCases) {
|
||||
// The goal of this test is to ensure that each
|
||||
// spanData is transformed and exported correctly!
|
||||
|
||||
@@ -384,25 +402,20 @@ func testAndVerify(name string, t *testing.T, f func(t *testing.T) []testCases)
|
||||
// Give the background collector connection sometime to setup.
|
||||
<-time.After(20 * time.Millisecond)
|
||||
|
||||
tcs := f(t)
|
||||
|
||||
for _, tc := range tcs {
|
||||
exp.ExportSpans(context.Background(), []*export.SpanData{tc.otSpan})
|
||||
}
|
||||
tc := f(t)
|
||||
exp.ExportSpans(context.Background(), tc.otSpan)
|
||||
|
||||
_ = exp.Stop()
|
||||
_ = collector.stop()
|
||||
|
||||
spans := collector.getSpans()
|
||||
gotCount := len(spans)
|
||||
wantCount := len(tcs)
|
||||
rss := collector.getResourceSpans()
|
||||
gotCount := len(rss)
|
||||
wantCount := 1
|
||||
if gotCount != wantCount {
|
||||
t.Fatalf("%s: got %d spans, want %d spans", name, gotCount, wantCount)
|
||||
t.Fatalf("%s: ResourceSpans: got %d, want %d", name, gotCount, wantCount)
|
||||
}
|
||||
for i, tc := range tcs {
|
||||
exp.ExportSpans(context.Background(), []*export.SpanData{tc.otSpan})
|
||||
if diff := cmp.Diff(spans[i], tc.otlpSpan, cmp.Comparer(proto.Equal)); diff != "" {
|
||||
t.Fatalf("%s transformed span differs %v\n", name, diff)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(rss[0], tc.otlpResSpan, cmp.Comparer(proto.Equal)); diff != "" {
|
||||
t.Fatalf("%s transformed span differs %v\n", name, diff)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,7 +116,8 @@ func TestExporter_ExportSpan(t *testing.T) {
|
||||
`"DroppedAttributeCount":0,` +
|
||||
`"DroppedMessageEventCount":0,` +
|
||||
`"DroppedLinkCount":0,` +
|
||||
`"ChildSpanCount":0}` + "\n"
|
||||
`"ChildSpanCount":0,` +
|
||||
`"Resource":null}` + "\n"
|
||||
|
||||
if got != expectedOutput {
|
||||
t.Errorf("Want: %v but got: %v", expectedOutput, got)
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
// SpanSyncer is a type for functions that receive a single sampled trace span.
|
||||
@@ -67,6 +68,9 @@ type SpanData struct {
|
||||
|
||||
// ChildSpanCount holds the number of child span created for this span.
|
||||
ChildSpanCount int
|
||||
|
||||
// Resource contains attributes representing an entity that produced this span.
|
||||
Resource *resource.Resource
|
||||
}
|
||||
|
||||
// Event is used to describe an Event with a message string and set of
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package trace
|
||||
|
||||
import (
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
"go.opentelemetry.io/otel/sdk/trace/internal"
|
||||
)
|
||||
|
||||
@@ -34,6 +35,9 @@ type Config struct {
|
||||
|
||||
// MaxLinksPerSpan is max number of links per span
|
||||
MaxLinksPerSpan int
|
||||
|
||||
// Resource contains attributes representing an entity that produces telemetry.
|
||||
Resource *resource.Resource
|
||||
}
|
||||
|
||||
const (
|
||||
|
||||
@@ -19,7 +19,9 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
export "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
)
|
||||
|
||||
@@ -160,6 +162,9 @@ func (p *Provider) ApplyConfig(cfg Config) {
|
||||
if cfg.MaxLinksPerSpan > 0 {
|
||||
c.MaxLinksPerSpan = cfg.MaxLinksPerSpan
|
||||
}
|
||||
if cfg.Resource != nil {
|
||||
c.Resource = resource.New(cfg.Resource.Attributes()...)
|
||||
}
|
||||
p.config.Store(&c)
|
||||
}
|
||||
|
||||
@@ -189,3 +194,11 @@ func WithConfig(config Config) ProviderOption {
|
||||
opts.config = config
|
||||
}
|
||||
}
|
||||
|
||||
// WithResourceAttributes option sets the resource attributes to the provider.
|
||||
// Resource is added to the span when it is started.
|
||||
func WithResourceAttributes(attrs ...core.KeyValue) ProviderOption {
|
||||
return func(opts *ProviderOptions) {
|
||||
opts.config.Resource = resource.New(attrs...)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,6 +339,7 @@ func startSpanInternal(tr *tracer, name string, parent core.SpanContext, remoteP
|
||||
SpanKind: apitrace.ValidateSpanKind(o.SpanKind),
|
||||
Name: name,
|
||||
HasRemoteParent: remoteParent,
|
||||
Resource: cfg.Resource,
|
||||
}
|
||||
span.attributes = newAttributesMap(cfg.MaxAttributesPerSpan)
|
||||
span.messageEvents = newEvictedQueue(cfg.MaxEventsPerSpan)
|
||||
|
||||
+36
-1
@@ -34,6 +34,7 @@ import (
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
ottest "go.opentelemetry.io/otel/internal/testing"
|
||||
export "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -598,7 +599,10 @@ func TestSetSpanStatus(t *testing.T) {
|
||||
}
|
||||
|
||||
func cmpDiff(x, y interface{}) string {
|
||||
return cmp.Diff(x, y, cmp.AllowUnexported(core.Value{}), cmp.AllowUnexported(export.Event{}))
|
||||
return cmp.Diff(x, y,
|
||||
cmp.AllowUnexported(core.Value{}),
|
||||
cmp.AllowUnexported(export.Event{}),
|
||||
cmp.AllowUnexported(resource.Resource{}))
|
||||
}
|
||||
|
||||
func remoteSpanContext() core.SpanContext {
|
||||
@@ -1058,3 +1062,34 @@ func TestWithSpanKind(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithResource(t *testing.T) {
|
||||
var te testExporter
|
||||
tp, _ := NewProvider(WithSyncer(&te),
|
||||
WithConfig(Config{DefaultSampler: AlwaysSample()}),
|
||||
WithResourceAttributes(key.String("rk1", "rv1"), key.Int64("rk2", 5)))
|
||||
span := startSpan(tp, "WithResource")
|
||||
span.SetAttributes(core.Key("key1").String("value1"))
|
||||
got, err := endSpan(&te, span)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
|
||||
want := &export.SpanData{
|
||||
SpanContext: core.SpanContext{
|
||||
TraceID: tid,
|
||||
TraceFlags: 0x1,
|
||||
},
|
||||
ParentSpanID: sid,
|
||||
Name: "span0",
|
||||
Attributes: []core.KeyValue{
|
||||
key.String("key1", "value1"),
|
||||
},
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
Resource: resource.New(key.String("rk1", "rv1"), key.Int64("rk2", 5)),
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("WithResource:\n -got +want %s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user