mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-30 04:40:41 +02:00
Fix duplicate logs across resources (#5803)
1. Create scope map with resource key to map the correct log record. 2. Add test case with different resource and scope combination Fixes #5782 ### Benchmarks ``` goos: darwin goarch: arm64 pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ ResourceLogs-8 3.266µ ± 3% 1.100µ ± 5% -66.33% (p=0.000 n=10) │ old.txt │ new.txt │ │ B/op │ B/op vs base │ ResourceLogs-8 8.297Ki ± 0% 2.430Ki ± 0% -70.72% (p=0.000 n=10) │ old.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ ResourceLogs-8 178.00 ± 0% 52.00 ± 0% -70.79% (p=0.000 n=10) ``` --------- Co-authored-by: Sam Xie <sam@samxie.me>
This commit is contained in:
parent
42fd8fe325
commit
534ce5ab09
@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
### Fixed
|
||||
|
||||
- The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819)
|
||||
- Fix log records duplication in case of heterogeneous resource attributes by correctly mapping each log record to it's resource and scope. (#5803)
|
||||
|
||||
<!-- Released section -->
|
||||
<!-- Don't change this section unless doing release -->
|
||||
|
@ -9,7 +9,6 @@
|
||||
package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform"
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cpb "go.opentelemetry.io/proto/otlp/common/v1"
|
||||
@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs {
|
||||
return nil
|
||||
}
|
||||
|
||||
resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
defer func() {
|
||||
clear(resMap)
|
||||
resourceLogsMapPool.Put(resMap)
|
||||
}()
|
||||
resourceLogsMap(&resMap, records)
|
||||
resMap := make(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
|
||||
out := make([]*lpb.ResourceLogs, 0, len(resMap))
|
||||
for _, rl := range resMap {
|
||||
out = append(out, rl)
|
||||
}
|
||||
return out
|
||||
type key struct {
|
||||
r attribute.Distinct
|
||||
is instrumentation.Scope
|
||||
}
|
||||
scopeMap := make(map[key]*lpb.ScopeLogs)
|
||||
|
||||
var resourceLogsMapPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
},
|
||||
}
|
||||
|
||||
func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) {
|
||||
var resources int
|
||||
for _, r := range records {
|
||||
res := r.Resource()
|
||||
rl, ok := (*dst)[res.Equivalent()]
|
||||
if !ok {
|
||||
rl = new(lpb.ResourceLogs)
|
||||
if res.Len() > 0 {
|
||||
rl.Resource = &rpb.Resource{
|
||||
Attributes: AttrIter(res.Iter()),
|
||||
}
|
||||
}
|
||||
rl.SchemaUrl = res.SchemaURL()
|
||||
(*dst)[res.Equivalent()] = rl
|
||||
}
|
||||
rl.ScopeLogs = ScopeLogs(records)
|
||||
}
|
||||
}
|
||||
|
||||
// ScopeLogs returns a slice of OTLP ScopeLogs generated from records.
|
||||
func ScopeLogs(records []log.Record) []*lpb.ScopeLogs {
|
||||
scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs)
|
||||
defer func() {
|
||||
clear(scopeMap)
|
||||
scopeLogsMapPool.Put(scopeMap)
|
||||
}()
|
||||
scopeLogsMap(&scopeMap, records)
|
||||
|
||||
out := make([]*lpb.ScopeLogs, 0, len(scopeMap))
|
||||
for _, sl := range scopeMap {
|
||||
out = append(out, sl)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
var scopeLogsMapPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make(map[instrumentation.Scope]*lpb.ScopeLogs)
|
||||
},
|
||||
}
|
||||
|
||||
func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) {
|
||||
for _, r := range records {
|
||||
rKey := res.Equivalent()
|
||||
scope := r.InstrumentationScope()
|
||||
sl, ok := (*dst)[scope]
|
||||
if !ok {
|
||||
k := key{
|
||||
r: rKey,
|
||||
is: scope,
|
||||
}
|
||||
sl, iOk := scopeMap[k]
|
||||
if !iOk {
|
||||
sl = new(lpb.ScopeLogs)
|
||||
var emptyScope instrumentation.Scope
|
||||
if scope != emptyScope {
|
||||
@ -102,11 +55,35 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R
|
||||
}
|
||||
sl.SchemaUrl = scope.SchemaURL
|
||||
}
|
||||
(*dst)[scope] = sl
|
||||
scopeMap[k] = sl
|
||||
}
|
||||
|
||||
sl.LogRecords = append(sl.LogRecords, LogRecord(r))
|
||||
rl, rOk := resMap[rKey]
|
||||
if !rOk {
|
||||
resources++
|
||||
rl = new(lpb.ResourceLogs)
|
||||
if res.Len() > 0 {
|
||||
rl.Resource = &rpb.Resource{
|
||||
Attributes: AttrIter(res.Iter()),
|
||||
}
|
||||
}
|
||||
rl.SchemaUrl = res.SchemaURL()
|
||||
resMap[rKey] = rl
|
||||
}
|
||||
if !iOk {
|
||||
rl.ScopeLogs = append(rl.ScopeLogs, sl)
|
||||
}
|
||||
}
|
||||
|
||||
// Transform the categorized map into a slice
|
||||
resLogs := make([]*lpb.ResourceLogs, 0, resources)
|
||||
for _, rl := range resMap {
|
||||
resLogs = append(resLogs, rl)
|
||||
}
|
||||
|
||||
return resLogs
|
||||
}
|
||||
|
||||
// LogRecord returns an OTLP LogRecord generated from record.
|
||||
func LogRecord(record log.Record) *lpb.LogRecord {
|
||||
|
@ -30,73 +30,106 @@ var (
|
||||
ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0))
|
||||
obs = ts.Add(30 * time.Second)
|
||||
|
||||
tom = api.String("user", "tom")
|
||||
jerry = api.String("user", "jerry")
|
||||
// A time before unix 0.
|
||||
negativeTs = time.Date(1969, 7, 20, 20, 17, 0, 0, time.UTC)
|
||||
|
||||
alice = api.String("user", "alice")
|
||||
bob = api.String("user", "bob")
|
||||
|
||||
pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "alice"},
|
||||
pbTom = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "tom"},
|
||||
}}
|
||||
pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "bob"},
|
||||
pbJerry = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "jerry"},
|
||||
}}
|
||||
|
||||
sevA = api.SeverityInfo
|
||||
sevB = api.SeverityError
|
||||
sevC = api.SeverityInfo
|
||||
sevD = api.SeverityError
|
||||
|
||||
pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO
|
||||
pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR
|
||||
pbSevC = lpb.SeverityNumber_SEVERITY_NUMBER_INFO
|
||||
pbSevD = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR
|
||||
|
||||
bodyA = api.StringValue("a")
|
||||
bodyB = api.StringValue("b")
|
||||
bodyC = api.StringValue("c")
|
||||
bodyD = api.StringValue("d")
|
||||
|
||||
pbBodyA = &cpb.AnyValue{
|
||||
pbBodyC = &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{
|
||||
StringValue: "a",
|
||||
StringValue: "c",
|
||||
},
|
||||
}
|
||||
pbBodyB = &cpb.AnyValue{
|
||||
pbBodyD = &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{
|
||||
StringValue: "b",
|
||||
StringValue: "d",
|
||||
},
|
||||
}
|
||||
|
||||
spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1}
|
||||
spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2}
|
||||
traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
|
||||
traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
|
||||
flagsA = byte(1)
|
||||
flagsB = byte(0)
|
||||
spanIDC = []byte{0, 0, 0, 0, 0, 0, 0, 1}
|
||||
spanIDD = []byte{0, 0, 0, 0, 0, 0, 0, 2}
|
||||
traceIDC = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
|
||||
traceIDD = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
|
||||
flagsC = byte(1)
|
||||
flagsD = byte(0)
|
||||
|
||||
scope = instrumentation.Scope{
|
||||
Name: "test/code/path",
|
||||
Version: "v0.1.0",
|
||||
Name: "otel/test/code/path1",
|
||||
Version: "v0.1.1",
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
}
|
||||
scope2 = instrumentation.Scope{
|
||||
Name: "otel/test/code/path2",
|
||||
Version: "v0.2.2",
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
}
|
||||
scopeList = []instrumentation.Scope{scope, scope2}
|
||||
|
||||
pbScope = &cpb.InstrumentationScope{
|
||||
Name: "test/code/path",
|
||||
Version: "v0.1.0",
|
||||
Name: "otel/test/code/path1",
|
||||
Version: "v0.1.1",
|
||||
}
|
||||
pbScope2 = &cpb.InstrumentationScope{
|
||||
Name: "otel/test/code/path2",
|
||||
Version: "v0.2.2",
|
||||
}
|
||||
|
||||
res = resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceName("test server"),
|
||||
semconv.ServiceVersion("v0.1.0"),
|
||||
semconv.ServiceName("service1"),
|
||||
semconv.ServiceVersion("v0.1.1"),
|
||||
)
|
||||
res2 = resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceName("service2"),
|
||||
semconv.ServiceVersion("v0.2.2"),
|
||||
)
|
||||
resList = []*resource.Resource{res, res2}
|
||||
|
||||
pbRes = &rpb.Resource{
|
||||
Attributes: []*cpb.KeyValue{
|
||||
{
|
||||
Key: "service.name",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "test server"},
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "service1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: "service.version",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"},
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
pbRes2 = &rpb.Resource{
|
||||
Attributes: []*cpb.KeyValue{
|
||||
{
|
||||
Key: "service.name",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "service2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: "service.version",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.2.2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -105,75 +138,79 @@ var (
|
||||
records = func() []log.Record {
|
||||
var out []log.Record
|
||||
|
||||
for _, r := range resList {
|
||||
for _, s := range scopeList {
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevA,
|
||||
SeverityText: "A",
|
||||
Body: bodyA,
|
||||
Attributes: []api.KeyValue{alice},
|
||||
TraceID: trace.TraceID(traceIDA),
|
||||
SpanID: trace.SpanID(spanIDA),
|
||||
TraceFlags: trace.TraceFlags(flagsA),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevC,
|
||||
SeverityText: "C",
|
||||
Body: bodyC,
|
||||
Attributes: []api.KeyValue{tom},
|
||||
TraceID: trace.TraceID(traceIDC),
|
||||
SpanID: trace.SpanID(spanIDC),
|
||||
TraceFlags: trace.TraceFlags(flagsC),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevA,
|
||||
SeverityText: "A",
|
||||
Body: bodyA,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDA),
|
||||
SpanID: trace.SpanID(spanIDA),
|
||||
TraceFlags: trace.TraceFlags(flagsA),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevC,
|
||||
SeverityText: "C",
|
||||
Body: bodyC,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDC),
|
||||
SpanID: trace.SpanID(spanIDC),
|
||||
TraceFlags: trace.TraceFlags(flagsC),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{alice},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{tom},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: negativeTs,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
}
|
||||
}
|
||||
|
||||
return out
|
||||
}()
|
||||
@ -182,76 +219,90 @@ var (
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevA,
|
||||
SeverityText: "A",
|
||||
Body: pbBodyA,
|
||||
Attributes: []*cpb.KeyValue{pbAlice},
|
||||
Flags: uint32(flagsA),
|
||||
TraceId: traceIDA,
|
||||
SpanId: spanIDA,
|
||||
SeverityNumber: pbSevC,
|
||||
SeverityText: "C",
|
||||
Body: pbBodyC,
|
||||
Attributes: []*cpb.KeyValue{pbTom},
|
||||
Flags: uint32(flagsC),
|
||||
TraceId: traceIDC,
|
||||
SpanId: spanIDC,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevA,
|
||||
SeverityText: "A",
|
||||
Body: pbBodyA,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsA),
|
||||
TraceId: traceIDA,
|
||||
SpanId: spanIDA,
|
||||
SeverityNumber: pbSevC,
|
||||
SeverityText: "C",
|
||||
Body: pbBodyC,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsC),
|
||||
TraceId: traceIDC,
|
||||
SpanId: spanIDC,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbAlice},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbTom},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: 0,
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
}
|
||||
|
||||
pbScopeLogs = &lpb.ScopeLogs{
|
||||
pbScopeLogsList = []*lpb.ScopeLogs{
|
||||
{
|
||||
Scope: pbScope,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
LogRecords: pbLogRecords,
|
||||
},
|
||||
{
|
||||
Scope: pbScope2,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
LogRecords: pbLogRecords,
|
||||
},
|
||||
}
|
||||
|
||||
pbResourceLogs = &lpb.ResourceLogs{
|
||||
pbResourceLogsList = []*lpb.ResourceLogs{
|
||||
{
|
||||
Resource: pbRes,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
ScopeLogs: []*lpb.ScopeLogs{pbScopeLogs},
|
||||
ScopeLogs: pbScopeLogsList,
|
||||
},
|
||||
{
|
||||
Resource: pbRes2,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
ScopeLogs: pbScopeLogsList,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func TestResourceLogs(t *testing.T) {
|
||||
want := []*lpb.ResourceLogs{pbResourceLogs}
|
||||
assert.Equal(t, want, ResourceLogs(records))
|
||||
want := pbResourceLogsList
|
||||
assert.ElementsMatch(t, want, ResourceLogs(records))
|
||||
}
|
||||
|
||||
func TestSeverityNumber(t *testing.T) {
|
||||
|
@ -9,7 +9,6 @@
|
||||
package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/transform"
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cpb "go.opentelemetry.io/proto/otlp/common/v1"
|
||||
@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs {
|
||||
return nil
|
||||
}
|
||||
|
||||
resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
defer func() {
|
||||
clear(resMap)
|
||||
resourceLogsMapPool.Put(resMap)
|
||||
}()
|
||||
resourceLogsMap(&resMap, records)
|
||||
resMap := make(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
|
||||
out := make([]*lpb.ResourceLogs, 0, len(resMap))
|
||||
for _, rl := range resMap {
|
||||
out = append(out, rl)
|
||||
}
|
||||
return out
|
||||
type key struct {
|
||||
r attribute.Distinct
|
||||
is instrumentation.Scope
|
||||
}
|
||||
scopeMap := make(map[key]*lpb.ScopeLogs)
|
||||
|
||||
var resourceLogsMapPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
},
|
||||
}
|
||||
|
||||
func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) {
|
||||
var resources int
|
||||
for _, r := range records {
|
||||
res := r.Resource()
|
||||
rl, ok := (*dst)[res.Equivalent()]
|
||||
if !ok {
|
||||
rl = new(lpb.ResourceLogs)
|
||||
if res.Len() > 0 {
|
||||
rl.Resource = &rpb.Resource{
|
||||
Attributes: AttrIter(res.Iter()),
|
||||
}
|
||||
}
|
||||
rl.SchemaUrl = res.SchemaURL()
|
||||
(*dst)[res.Equivalent()] = rl
|
||||
}
|
||||
rl.ScopeLogs = ScopeLogs(records)
|
||||
}
|
||||
}
|
||||
|
||||
// ScopeLogs returns a slice of OTLP ScopeLogs generated from records.
|
||||
func ScopeLogs(records []log.Record) []*lpb.ScopeLogs {
|
||||
scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs)
|
||||
defer func() {
|
||||
clear(scopeMap)
|
||||
scopeLogsMapPool.Put(scopeMap)
|
||||
}()
|
||||
scopeLogsMap(&scopeMap, records)
|
||||
|
||||
out := make([]*lpb.ScopeLogs, 0, len(scopeMap))
|
||||
for _, sl := range scopeMap {
|
||||
out = append(out, sl)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
var scopeLogsMapPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make(map[instrumentation.Scope]*lpb.ScopeLogs)
|
||||
},
|
||||
}
|
||||
|
||||
func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) {
|
||||
for _, r := range records {
|
||||
rKey := res.Equivalent()
|
||||
scope := r.InstrumentationScope()
|
||||
sl, ok := (*dst)[scope]
|
||||
if !ok {
|
||||
k := key{
|
||||
r: rKey,
|
||||
is: scope,
|
||||
}
|
||||
sl, iOk := scopeMap[k]
|
||||
if !iOk {
|
||||
sl = new(lpb.ScopeLogs)
|
||||
var emptyScope instrumentation.Scope
|
||||
if scope != emptyScope {
|
||||
@ -102,11 +55,35 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R
|
||||
}
|
||||
sl.SchemaUrl = scope.SchemaURL
|
||||
}
|
||||
(*dst)[scope] = sl
|
||||
scopeMap[k] = sl
|
||||
}
|
||||
|
||||
sl.LogRecords = append(sl.LogRecords, LogRecord(r))
|
||||
rl, rOk := resMap[rKey]
|
||||
if !rOk {
|
||||
resources++
|
||||
rl = new(lpb.ResourceLogs)
|
||||
if res.Len() > 0 {
|
||||
rl.Resource = &rpb.Resource{
|
||||
Attributes: AttrIter(res.Iter()),
|
||||
}
|
||||
}
|
||||
rl.SchemaUrl = res.SchemaURL()
|
||||
resMap[rKey] = rl
|
||||
}
|
||||
if !iOk {
|
||||
rl.ScopeLogs = append(rl.ScopeLogs, sl)
|
||||
}
|
||||
}
|
||||
|
||||
// Transform the categorized map into a slice
|
||||
resLogs := make([]*lpb.ResourceLogs, 0, resources)
|
||||
for _, rl := range resMap {
|
||||
resLogs = append(resLogs, rl)
|
||||
}
|
||||
|
||||
return resLogs
|
||||
}
|
||||
|
||||
// LogRecord returns an OTLP LogRecord generated from record.
|
||||
func LogRecord(record log.Record) *lpb.LogRecord {
|
||||
|
@ -30,73 +30,106 @@ var (
|
||||
ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0))
|
||||
obs = ts.Add(30 * time.Second)
|
||||
|
||||
tom = api.String("user", "tom")
|
||||
jerry = api.String("user", "jerry")
|
||||
// A time before unix 0.
|
||||
negativeTs = time.Date(1969, 7, 20, 20, 17, 0, 0, time.UTC)
|
||||
|
||||
alice = api.String("user", "alice")
|
||||
bob = api.String("user", "bob")
|
||||
|
||||
pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "alice"},
|
||||
pbTom = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "tom"},
|
||||
}}
|
||||
pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "bob"},
|
||||
pbJerry = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "jerry"},
|
||||
}}
|
||||
|
||||
sevA = api.SeverityInfo
|
||||
sevB = api.SeverityError
|
||||
sevC = api.SeverityInfo
|
||||
sevD = api.SeverityError
|
||||
|
||||
pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO
|
||||
pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR
|
||||
pbSevC = lpb.SeverityNumber_SEVERITY_NUMBER_INFO
|
||||
pbSevD = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR
|
||||
|
||||
bodyA = api.StringValue("a")
|
||||
bodyB = api.StringValue("b")
|
||||
bodyC = api.StringValue("c")
|
||||
bodyD = api.StringValue("d")
|
||||
|
||||
pbBodyA = &cpb.AnyValue{
|
||||
pbBodyC = &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{
|
||||
StringValue: "a",
|
||||
StringValue: "c",
|
||||
},
|
||||
}
|
||||
pbBodyB = &cpb.AnyValue{
|
||||
pbBodyD = &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{
|
||||
StringValue: "b",
|
||||
StringValue: "d",
|
||||
},
|
||||
}
|
||||
|
||||
spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1}
|
||||
spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2}
|
||||
traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
|
||||
traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
|
||||
flagsA = byte(1)
|
||||
flagsB = byte(0)
|
||||
spanIDC = []byte{0, 0, 0, 0, 0, 0, 0, 1}
|
||||
spanIDD = []byte{0, 0, 0, 0, 0, 0, 0, 2}
|
||||
traceIDC = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
|
||||
traceIDD = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
|
||||
flagsC = byte(1)
|
||||
flagsD = byte(0)
|
||||
|
||||
scope = instrumentation.Scope{
|
||||
Name: "test/code/path",
|
||||
Version: "v0.1.0",
|
||||
Name: "otel/test/code/path1",
|
||||
Version: "v0.1.1",
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
}
|
||||
scope2 = instrumentation.Scope{
|
||||
Name: "otel/test/code/path2",
|
||||
Version: "v0.2.2",
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
}
|
||||
scopeList = []instrumentation.Scope{scope, scope2}
|
||||
|
||||
pbScope = &cpb.InstrumentationScope{
|
||||
Name: "test/code/path",
|
||||
Version: "v0.1.0",
|
||||
Name: "otel/test/code/path1",
|
||||
Version: "v0.1.1",
|
||||
}
|
||||
pbScope2 = &cpb.InstrumentationScope{
|
||||
Name: "otel/test/code/path2",
|
||||
Version: "v0.2.2",
|
||||
}
|
||||
|
||||
res = resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceName("test server"),
|
||||
semconv.ServiceVersion("v0.1.0"),
|
||||
semconv.ServiceName("service1"),
|
||||
semconv.ServiceVersion("v0.1.1"),
|
||||
)
|
||||
res2 = resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceName("service2"),
|
||||
semconv.ServiceVersion("v0.2.2"),
|
||||
)
|
||||
resList = []*resource.Resource{res, res2}
|
||||
|
||||
pbRes = &rpb.Resource{
|
||||
Attributes: []*cpb.KeyValue{
|
||||
{
|
||||
Key: "service.name",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "test server"},
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "service1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: "service.version",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"},
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
pbRes2 = &rpb.Resource{
|
||||
Attributes: []*cpb.KeyValue{
|
||||
{
|
||||
Key: "service.name",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "service2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: "service.version",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.2.2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -105,75 +138,79 @@ var (
|
||||
records = func() []log.Record {
|
||||
var out []log.Record
|
||||
|
||||
for _, r := range resList {
|
||||
for _, s := range scopeList {
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevA,
|
||||
SeverityText: "A",
|
||||
Body: bodyA,
|
||||
Attributes: []api.KeyValue{alice},
|
||||
TraceID: trace.TraceID(traceIDA),
|
||||
SpanID: trace.SpanID(spanIDA),
|
||||
TraceFlags: trace.TraceFlags(flagsA),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevC,
|
||||
SeverityText: "C",
|
||||
Body: bodyC,
|
||||
Attributes: []api.KeyValue{tom},
|
||||
TraceID: trace.TraceID(traceIDC),
|
||||
SpanID: trace.SpanID(spanIDC),
|
||||
TraceFlags: trace.TraceFlags(flagsC),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevA,
|
||||
SeverityText: "A",
|
||||
Body: bodyA,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDA),
|
||||
SpanID: trace.SpanID(spanIDA),
|
||||
TraceFlags: trace.TraceFlags(flagsA),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevC,
|
||||
SeverityText: "C",
|
||||
Body: bodyC,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDC),
|
||||
SpanID: trace.SpanID(spanIDC),
|
||||
TraceFlags: trace.TraceFlags(flagsC),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{alice},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{tom},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: negativeTs,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
}
|
||||
}
|
||||
|
||||
return out
|
||||
}()
|
||||
@ -182,76 +219,90 @@ var (
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevA,
|
||||
SeverityText: "A",
|
||||
Body: pbBodyA,
|
||||
Attributes: []*cpb.KeyValue{pbAlice},
|
||||
Flags: uint32(flagsA),
|
||||
TraceId: traceIDA,
|
||||
SpanId: spanIDA,
|
||||
SeverityNumber: pbSevC,
|
||||
SeverityText: "C",
|
||||
Body: pbBodyC,
|
||||
Attributes: []*cpb.KeyValue{pbTom},
|
||||
Flags: uint32(flagsC),
|
||||
TraceId: traceIDC,
|
||||
SpanId: spanIDC,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevA,
|
||||
SeverityText: "A",
|
||||
Body: pbBodyA,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsA),
|
||||
TraceId: traceIDA,
|
||||
SpanId: spanIDA,
|
||||
SeverityNumber: pbSevC,
|
||||
SeverityText: "C",
|
||||
Body: pbBodyC,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsC),
|
||||
TraceId: traceIDC,
|
||||
SpanId: spanIDC,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbAlice},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbTom},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: 0,
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
}
|
||||
|
||||
pbScopeLogs = &lpb.ScopeLogs{
|
||||
pbScopeLogsList = []*lpb.ScopeLogs{
|
||||
{
|
||||
Scope: pbScope,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
LogRecords: pbLogRecords,
|
||||
},
|
||||
{
|
||||
Scope: pbScope2,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
LogRecords: pbLogRecords,
|
||||
},
|
||||
}
|
||||
|
||||
pbResourceLogs = &lpb.ResourceLogs{
|
||||
pbResourceLogsList = []*lpb.ResourceLogs{
|
||||
{
|
||||
Resource: pbRes,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
ScopeLogs: []*lpb.ScopeLogs{pbScopeLogs},
|
||||
ScopeLogs: pbScopeLogsList,
|
||||
},
|
||||
{
|
||||
Resource: pbRes2,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
ScopeLogs: pbScopeLogsList,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func TestResourceLogs(t *testing.T) {
|
||||
want := []*lpb.ResourceLogs{pbResourceLogs}
|
||||
assert.Equal(t, want, ResourceLogs(records))
|
||||
want := pbResourceLogsList
|
||||
assert.ElementsMatch(t, want, ResourceLogs(records))
|
||||
}
|
||||
|
||||
func TestSeverityNumber(t *testing.T) {
|
||||
|
@ -9,7 +9,6 @@
|
||||
package transform // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/transform"
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cpb "go.opentelemetry.io/proto/otlp/common/v1"
|
||||
@ -28,71 +27,25 @@ func ResourceLogs(records []log.Record) []*lpb.ResourceLogs {
|
||||
return nil
|
||||
}
|
||||
|
||||
resMap := resourceLogsMapPool.Get().(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
defer func() {
|
||||
clear(resMap)
|
||||
resourceLogsMapPool.Put(resMap)
|
||||
}()
|
||||
resourceLogsMap(&resMap, records)
|
||||
resMap := make(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
|
||||
out := make([]*lpb.ResourceLogs, 0, len(resMap))
|
||||
for _, rl := range resMap {
|
||||
out = append(out, rl)
|
||||
}
|
||||
return out
|
||||
type key struct {
|
||||
r attribute.Distinct
|
||||
is instrumentation.Scope
|
||||
}
|
||||
scopeMap := make(map[key]*lpb.ScopeLogs)
|
||||
|
||||
var resourceLogsMapPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make(map[attribute.Distinct]*lpb.ResourceLogs)
|
||||
},
|
||||
}
|
||||
|
||||
func resourceLogsMap(dst *map[attribute.Distinct]*lpb.ResourceLogs, records []log.Record) {
|
||||
var resources int
|
||||
for _, r := range records {
|
||||
res := r.Resource()
|
||||
rl, ok := (*dst)[res.Equivalent()]
|
||||
if !ok {
|
||||
rl = new(lpb.ResourceLogs)
|
||||
if res.Len() > 0 {
|
||||
rl.Resource = &rpb.Resource{
|
||||
Attributes: AttrIter(res.Iter()),
|
||||
}
|
||||
}
|
||||
rl.SchemaUrl = res.SchemaURL()
|
||||
(*dst)[res.Equivalent()] = rl
|
||||
}
|
||||
rl.ScopeLogs = ScopeLogs(records)
|
||||
}
|
||||
}
|
||||
|
||||
// ScopeLogs returns a slice of OTLP ScopeLogs generated from records.
|
||||
func ScopeLogs(records []log.Record) []*lpb.ScopeLogs {
|
||||
scopeMap := scopeLogsMapPool.Get().(map[instrumentation.Scope]*lpb.ScopeLogs)
|
||||
defer func() {
|
||||
clear(scopeMap)
|
||||
scopeLogsMapPool.Put(scopeMap)
|
||||
}()
|
||||
scopeLogsMap(&scopeMap, records)
|
||||
|
||||
out := make([]*lpb.ScopeLogs, 0, len(scopeMap))
|
||||
for _, sl := range scopeMap {
|
||||
out = append(out, sl)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
var scopeLogsMapPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make(map[instrumentation.Scope]*lpb.ScopeLogs)
|
||||
},
|
||||
}
|
||||
|
||||
func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.Record) {
|
||||
for _, r := range records {
|
||||
rKey := res.Equivalent()
|
||||
scope := r.InstrumentationScope()
|
||||
sl, ok := (*dst)[scope]
|
||||
if !ok {
|
||||
k := key{
|
||||
r: rKey,
|
||||
is: scope,
|
||||
}
|
||||
sl, iOk := scopeMap[k]
|
||||
if !iOk {
|
||||
sl = new(lpb.ScopeLogs)
|
||||
var emptyScope instrumentation.Scope
|
||||
if scope != emptyScope {
|
||||
@ -102,11 +55,35 @@ func scopeLogsMap(dst *map[instrumentation.Scope]*lpb.ScopeLogs, records []log.R
|
||||
}
|
||||
sl.SchemaUrl = scope.SchemaURL
|
||||
}
|
||||
(*dst)[scope] = sl
|
||||
scopeMap[k] = sl
|
||||
}
|
||||
|
||||
sl.LogRecords = append(sl.LogRecords, LogRecord(r))
|
||||
rl, rOk := resMap[rKey]
|
||||
if !rOk {
|
||||
resources++
|
||||
rl = new(lpb.ResourceLogs)
|
||||
if res.Len() > 0 {
|
||||
rl.Resource = &rpb.Resource{
|
||||
Attributes: AttrIter(res.Iter()),
|
||||
}
|
||||
}
|
||||
rl.SchemaUrl = res.SchemaURL()
|
||||
resMap[rKey] = rl
|
||||
}
|
||||
if !iOk {
|
||||
rl.ScopeLogs = append(rl.ScopeLogs, sl)
|
||||
}
|
||||
}
|
||||
|
||||
// Transform the categorized map into a slice
|
||||
resLogs := make([]*lpb.ResourceLogs, 0, resources)
|
||||
for _, rl := range resMap {
|
||||
resLogs = append(resLogs, rl)
|
||||
}
|
||||
|
||||
return resLogs
|
||||
}
|
||||
|
||||
// LogRecord returns an OTLP LogRecord generated from record.
|
||||
func LogRecord(record log.Record) *lpb.LogRecord {
|
||||
|
@ -30,73 +30,106 @@ var (
|
||||
ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0))
|
||||
obs = ts.Add(30 * time.Second)
|
||||
|
||||
tom = api.String("user", "tom")
|
||||
jerry = api.String("user", "jerry")
|
||||
// A time before unix 0.
|
||||
negativeTs = time.Date(1969, 7, 20, 20, 17, 0, 0, time.UTC)
|
||||
|
||||
alice = api.String("user", "alice")
|
||||
bob = api.String("user", "bob")
|
||||
|
||||
pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "alice"},
|
||||
pbTom = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "tom"},
|
||||
}}
|
||||
pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "bob"},
|
||||
pbJerry = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "jerry"},
|
||||
}}
|
||||
|
||||
sevA = api.SeverityInfo
|
||||
sevB = api.SeverityError
|
||||
sevC = api.SeverityInfo
|
||||
sevD = api.SeverityError
|
||||
|
||||
pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO
|
||||
pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR
|
||||
pbSevC = lpb.SeverityNumber_SEVERITY_NUMBER_INFO
|
||||
pbSevD = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR
|
||||
|
||||
bodyA = api.StringValue("a")
|
||||
bodyB = api.StringValue("b")
|
||||
bodyC = api.StringValue("c")
|
||||
bodyD = api.StringValue("d")
|
||||
|
||||
pbBodyA = &cpb.AnyValue{
|
||||
pbBodyC = &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{
|
||||
StringValue: "a",
|
||||
StringValue: "c",
|
||||
},
|
||||
}
|
||||
pbBodyB = &cpb.AnyValue{
|
||||
pbBodyD = &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{
|
||||
StringValue: "b",
|
||||
StringValue: "d",
|
||||
},
|
||||
}
|
||||
|
||||
spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1}
|
||||
spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2}
|
||||
traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
|
||||
traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
|
||||
flagsA = byte(1)
|
||||
flagsB = byte(0)
|
||||
spanIDC = []byte{0, 0, 0, 0, 0, 0, 0, 1}
|
||||
spanIDD = []byte{0, 0, 0, 0, 0, 0, 0, 2}
|
||||
traceIDC = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
|
||||
traceIDD = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
|
||||
flagsC = byte(1)
|
||||
flagsD = byte(0)
|
||||
|
||||
scope = instrumentation.Scope{
|
||||
Name: "test/code/path",
|
||||
Version: "v0.1.0",
|
||||
Name: "otel/test/code/path1",
|
||||
Version: "v0.1.1",
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
}
|
||||
scope2 = instrumentation.Scope{
|
||||
Name: "otel/test/code/path2",
|
||||
Version: "v0.2.2",
|
||||
SchemaURL: semconv.SchemaURL,
|
||||
}
|
||||
scopeList = []instrumentation.Scope{scope, scope2}
|
||||
|
||||
pbScope = &cpb.InstrumentationScope{
|
||||
Name: "test/code/path",
|
||||
Version: "v0.1.0",
|
||||
Name: "otel/test/code/path1",
|
||||
Version: "v0.1.1",
|
||||
}
|
||||
pbScope2 = &cpb.InstrumentationScope{
|
||||
Name: "otel/test/code/path2",
|
||||
Version: "v0.2.2",
|
||||
}
|
||||
|
||||
res = resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceName("test server"),
|
||||
semconv.ServiceVersion("v0.1.0"),
|
||||
semconv.ServiceName("service1"),
|
||||
semconv.ServiceVersion("v0.1.1"),
|
||||
)
|
||||
res2 = resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceName("service2"),
|
||||
semconv.ServiceVersion("v0.2.2"),
|
||||
)
|
||||
resList = []*resource.Resource{res, res2}
|
||||
|
||||
pbRes = &rpb.Resource{
|
||||
Attributes: []*cpb.KeyValue{
|
||||
{
|
||||
Key: "service.name",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "test server"},
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "service1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: "service.version",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"},
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
pbRes2 = &rpb.Resource{
|
||||
Attributes: []*cpb.KeyValue{
|
||||
{
|
||||
Key: "service.name",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "service2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Key: "service.version",
|
||||
Value: &cpb.AnyValue{
|
||||
Value: &cpb.AnyValue_StringValue{StringValue: "v0.2.2"},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -105,75 +138,79 @@ var (
|
||||
records = func() []log.Record {
|
||||
var out []log.Record
|
||||
|
||||
for _, r := range resList {
|
||||
for _, s := range scopeList {
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevA,
|
||||
SeverityText: "A",
|
||||
Body: bodyA,
|
||||
Attributes: []api.KeyValue{alice},
|
||||
TraceID: trace.TraceID(traceIDA),
|
||||
SpanID: trace.SpanID(spanIDA),
|
||||
TraceFlags: trace.TraceFlags(flagsA),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevC,
|
||||
SeverityText: "C",
|
||||
Body: bodyC,
|
||||
Attributes: []api.KeyValue{tom},
|
||||
TraceID: trace.TraceID(traceIDC),
|
||||
SpanID: trace.SpanID(spanIDC),
|
||||
TraceFlags: trace.TraceFlags(flagsC),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevA,
|
||||
SeverityText: "A",
|
||||
Body: bodyA,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDA),
|
||||
SpanID: trace.SpanID(spanIDA),
|
||||
TraceFlags: trace.TraceFlags(flagsA),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevC,
|
||||
SeverityText: "C",
|
||||
Body: bodyC,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDC),
|
||||
SpanID: trace.SpanID(spanIDC),
|
||||
TraceFlags: trace.TraceFlags(flagsC),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{alice},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{tom},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: ts,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
|
||||
out = append(out, logtest.RecordFactory{
|
||||
Timestamp: negativeTs,
|
||||
ObservedTimestamp: obs,
|
||||
Severity: sevB,
|
||||
SeverityText: "B",
|
||||
Body: bodyB,
|
||||
Attributes: []api.KeyValue{bob},
|
||||
TraceID: trace.TraceID(traceIDB),
|
||||
SpanID: trace.SpanID(spanIDB),
|
||||
TraceFlags: trace.TraceFlags(flagsB),
|
||||
InstrumentationScope: &scope,
|
||||
Resource: res,
|
||||
Severity: sevD,
|
||||
SeverityText: "D",
|
||||
Body: bodyD,
|
||||
Attributes: []api.KeyValue{jerry},
|
||||
TraceID: trace.TraceID(traceIDD),
|
||||
SpanID: trace.SpanID(spanIDD),
|
||||
TraceFlags: trace.TraceFlags(flagsD),
|
||||
InstrumentationScope: &s,
|
||||
Resource: r,
|
||||
}.NewRecord())
|
||||
}
|
||||
}
|
||||
|
||||
return out
|
||||
}()
|
||||
@ -182,76 +219,90 @@ var (
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevA,
|
||||
SeverityText: "A",
|
||||
Body: pbBodyA,
|
||||
Attributes: []*cpb.KeyValue{pbAlice},
|
||||
Flags: uint32(flagsA),
|
||||
TraceId: traceIDA,
|
||||
SpanId: spanIDA,
|
||||
SeverityNumber: pbSevC,
|
||||
SeverityText: "C",
|
||||
Body: pbBodyC,
|
||||
Attributes: []*cpb.KeyValue{pbTom},
|
||||
Flags: uint32(flagsC),
|
||||
TraceId: traceIDC,
|
||||
SpanId: spanIDC,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevA,
|
||||
SeverityText: "A",
|
||||
Body: pbBodyA,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsA),
|
||||
TraceId: traceIDA,
|
||||
SpanId: spanIDA,
|
||||
SeverityNumber: pbSevC,
|
||||
SeverityText: "C",
|
||||
Body: pbBodyC,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsC),
|
||||
TraceId: traceIDC,
|
||||
SpanId: spanIDC,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbAlice},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbTom},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: uint64(ts.UnixNano()),
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
{
|
||||
TimeUnixNano: 0,
|
||||
ObservedTimeUnixNano: uint64(obs.UnixNano()),
|
||||
SeverityNumber: pbSevB,
|
||||
SeverityText: "B",
|
||||
Body: pbBodyB,
|
||||
Attributes: []*cpb.KeyValue{pbBob},
|
||||
Flags: uint32(flagsB),
|
||||
TraceId: traceIDB,
|
||||
SpanId: spanIDB,
|
||||
SeverityNumber: pbSevD,
|
||||
SeverityText: "D",
|
||||
Body: pbBodyD,
|
||||
Attributes: []*cpb.KeyValue{pbJerry},
|
||||
Flags: uint32(flagsD),
|
||||
TraceId: traceIDD,
|
||||
SpanId: spanIDD,
|
||||
},
|
||||
}
|
||||
|
||||
pbScopeLogs = &lpb.ScopeLogs{
|
||||
pbScopeLogsList = []*lpb.ScopeLogs{
|
||||
{
|
||||
Scope: pbScope,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
LogRecords: pbLogRecords,
|
||||
},
|
||||
{
|
||||
Scope: pbScope2,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
LogRecords: pbLogRecords,
|
||||
},
|
||||
}
|
||||
|
||||
pbResourceLogs = &lpb.ResourceLogs{
|
||||
pbResourceLogsList = []*lpb.ResourceLogs{
|
||||
{
|
||||
Resource: pbRes,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
ScopeLogs: []*lpb.ScopeLogs{pbScopeLogs},
|
||||
ScopeLogs: pbScopeLogsList,
|
||||
},
|
||||
{
|
||||
Resource: pbRes2,
|
||||
SchemaUrl: semconv.SchemaURL,
|
||||
ScopeLogs: pbScopeLogsList,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func TestResourceLogs(t *testing.T) {
|
||||
want := []*lpb.ResourceLogs{pbResourceLogs}
|
||||
assert.Equal(t, want, ResourceLogs(records))
|
||||
want := pbResourceLogsList
|
||||
assert.ElementsMatch(t, want, ResourceLogs(records))
|
||||
}
|
||||
|
||||
func TestSeverityNumber(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user