1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-28 03:57:09 +02:00

Split large jaeger span batch to admire the udp packet size limit (#1853)

* Split large jaeger span batch to admire the udp packet size

* Refactory EmitBatch and produce complaining error msg when serialization fails

* Add tests for large jaeger spans
Update CHANGELOG.md

* Update CHANGELOG.md

* Fix compatibility-test on windows.

* Add test case for exporting spans with multiple errors.
This commit is contained in:
Alex 2021-05-05 02:31:41 +08:00 committed by GitHub
parent 42a845093e
commit c99d5e999c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 167 additions and 9 deletions

View File

@ -93,6 +93,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE`
- `OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE`
- Adds `otlpgrpc.WithTimeout` option for configuring timeout to the otlp/gRPC exporter. (#1821)
- Adds `jaeger.WithMaxPacketSize` option for configuring maximum UDP packet size used when connecting to the Jaeger agent. (#1853)
### Fixed
@ -104,6 +105,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Zipkin Exporter: Ensure mapping between OTel and Zipkin span data complies with the specification. (#1688)
- Fixed typo for default service name in Jaeger Exporter. (#1797)
- Fix flaky OTLP for the reconnnection of the client connection. (#1527, #1814)
- Fix Jaeger exporter dropping of span batches that exceed the UDP packet size limit.
Instead, the exporter now splits the batch into smaller sendable batches. (#1828)
### Changed

View File

@ -20,6 +20,7 @@ import (
"io"
"log"
"net"
"strings"
"time"
"go.opentelemetry.io/otel/exporters/trace/jaeger/internal/third_party/thrift/lib/go/thrift"
@ -36,10 +37,11 @@ type agentClientUDP struct {
genAgent.Agent
io.Closer
connUDP udpConn
client *genAgent.AgentClient
maxPacketSize int // max size of datagram in bytes
thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
connUDP udpConn
client *genAgent.AgentClient
maxPacketSize int // max size of datagram in bytes
thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
thriftProtocol thrift.TProtocol
}
type udpConn interface {
@ -75,6 +77,7 @@ func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) {
thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
protocolFactory := thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
client := genAgent.NewAgentClientFactory(thriftBuffer, protocolFactory)
var connUDP udpConn
@ -103,15 +106,78 @@ func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) {
}
return &agentClientUDP{
connUDP: connUDP,
client: client,
maxPacketSize: params.MaxPacketSize,
thriftBuffer: thriftBuffer,
connUDP: connUDP,
client: client,
maxPacketSize: params.MaxPacketSize,
thriftBuffer: thriftBuffer,
thriftProtocol: thriftProtocol,
}, nil
}
// EmitBatch implements EmitBatch() of Agent interface
// EmitBatch buffers batch to fit into UDP packets and sends the data to the agent.
func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error {
var errs []error
processSize, err := a.calcSizeOfSerializedThrift(ctx, batch.Process)
if err != nil {
// drop the batch if serialization of process fails.
return err
}
totalSize := processSize
var spans []*gen.Span
for _, span := range batch.Spans {
spanSize, err := a.calcSizeOfSerializedThrift(ctx, span)
if err != nil {
errs = append(errs, fmt.Errorf("thrift serialization failed: %v", span))
continue
}
if spanSize+processSize >= a.maxPacketSize {
// drop the span that exceeds the limit.
errs = append(errs, fmt.Errorf("span too large to send: %v", span))
continue
}
if totalSize+spanSize >= a.maxPacketSize {
if err := a.flush(ctx, &gen.Batch{
Process: batch.Process,
Spans: spans,
}); err != nil {
errs = append(errs, err)
}
spans = spans[:0]
totalSize = processSize
}
totalSize += spanSize
spans = append(spans, span)
}
if len(spans) > 0 {
if err := a.flush(ctx, &gen.Batch{
Process: batch.Process,
Spans: spans,
}); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 1 {
return errs[0]
} else if len(errs) > 1 {
joined := a.makeJoinedErrorString(errs)
return fmt.Errorf("multiple errors during transform: %s", joined)
}
return nil
}
// makeJoinedErrorString join all the errors to one error message.
func (a *agentClientUDP) makeJoinedErrorString(errs []error) string {
var errMsgs []string
for _, err := range errs {
errMsgs = append(errMsgs, err.Error())
}
return strings.Join(errMsgs, ", ")
}
// flush will send the batch of spans to the agent.
func (a *agentClientUDP) flush(ctx context.Context, batch *gen.Batch) error {
a.thriftBuffer.Reset()
if err := a.client.EmitBatch(ctx, batch); err != nil {
return err
@ -124,6 +190,13 @@ func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error
return err
}
// calcSizeOfSerializedThrift calculate the serialized thrift packet size.
func (a *agentClientUDP) calcSizeOfSerializedThrift(ctx context.Context, thriftStruct thrift.TStruct) (int, error) {
a.thriftBuffer.Reset()
err := thriftStruct.Write(ctx, a.thriftProtocol)
return a.thriftBuffer.Len(), err
}
// Close implements Close() of io.Closer and closes the underlying UDP connection.
func (a *agentClientUDP) Close() error {
return a.connUDP.Close()

View File

@ -14,12 +14,16 @@
package jaeger
import (
"context"
"log"
"net"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
)
func TestNewAgentClientUDPWithParamsBadHostport(t *testing.T) {
@ -99,3 +103,74 @@ func TestNewAgentClientUDPWithParamsReconnectingDisabled(t *testing.T) {
assert.NoError(t, agentClient.Close())
}
type errorHandler struct{ t *testing.T }
func (eh errorHandler) Handle(err error) { assert.NoError(eh.t, err) }
func TestJaegerAgentUDPLimitBatching(t *testing.T) {
otel.SetErrorHandler(errorHandler{t})
// 1500 spans, size 79559, does not fit within one UDP packet with the default size of 65000.
n := 1500
s := make([]*tracesdk.SpanSnapshot, n)
for i := 0; i < n; i++ {
s[i] = &tracesdk.SpanSnapshot{}
}
exp, err := NewRawExporter(
WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831")),
)
require.NoError(t, err)
ctx := context.Background()
assert.NoError(t, exp.ExportSpans(ctx, s))
assert.NoError(t, exp.Shutdown(ctx))
}
// generateALargeSpan generates a span with a long name.
func generateALargeSpan() *tracesdk.SpanSnapshot {
span := &tracesdk.SpanSnapshot{
Name: "a-longer-name-that-makes-it-exceeds-limit",
}
return span
}
func TestSpanExceedsMaxPacketLimit(t *testing.T) {
otel.SetErrorHandler(errorHandler{t})
// 106 is the serialized size of a span with default values.
maxSize := 106
span := generateALargeSpan()
largeSpans := []*tracesdk.SpanSnapshot{span, {}}
normalSpans := []*tracesdk.SpanSnapshot{{}, {}}
exp, err := NewRawExporter(
WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831"), WithMaxPacketSize(maxSize+1)),
)
require.NoError(t, err)
ctx := context.Background()
assert.Error(t, exp.ExportSpans(ctx, largeSpans))
assert.NoError(t, exp.ExportSpans(ctx, normalSpans))
assert.NoError(t, exp.Shutdown(ctx))
}
func TestEmitBatchWithMultipleErrors(t *testing.T) {
otel.SetErrorHandler(errorHandler{t})
span := generateALargeSpan()
largeSpans := []*tracesdk.SpanSnapshot{span, span}
// make max packet size smaller than span
maxSize := len(span.Name)
exp, err := NewRawExporter(
WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831"), WithMaxPacketSize(maxSize)),
)
require.NoError(t, err)
ctx := context.Background()
err = exp.ExportSpans(ctx, largeSpans)
assert.Error(t, err)
require.Contains(t, err.Error(), "multiple errors")
}

View File

@ -114,6 +114,13 @@ func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption
}
}
// WithMaxPacketSize sets the maximum UDP packet size for transport to the Jaeger agent.
func WithMaxPacketSize(size int) AgentEndpointOption {
return func(o *AgentEndpointOptions) {
o.MaxPacketSize = size
}
}
// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector. This will
// use the following environment variables for configuration if no explicit option is provided:
//