1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-30 04:40:41 +02:00

Move gRPC driver to a subpackage and add an HTTP driver (#1420)

* Move grpc stuff to separate package

* Drop duplicated retryable status code

* Set default port to 4317

This is what the specification says for both gRPC and HTTP.

* Document gRPC option type

* Add an HTTP protocol driver for OTLP exporter

Currently it supports only binary protobuf payloads.

* Move end to end test to a separate package

It also adds some common code mock collectors can use. This will be
useful for testing the HTTP driver.

* Move export data creators to otlptest

It also extends the one record checkpointer a bit. This will be useful
for testing the HTTP driver.

* Add an HTTP mock collector and tests for HTTP driver

* Update changelog

* Do not depend on DefaultTransport

We create our own instance of the transport, which is based on
golang's DefaultTransport. That way we sidestep the issue of the
DefaultTransport being modified/overwritten. We won't have any panics
at init. The cost of it is to keep the transport fields in sync with
DefaultTransport.

* Read the whole response body before closing it

This may help with connection reuse.

* Change options to conform to our style guide

* Add jitter to backoff time

* Test TLS option

* Test extra headers

* Fix a comment

* Increase coverage

* Add a source of the backoff strategy
This commit is contained in:
Krzesimir Nowak 2021-01-12 04:55:24 +01:00 committed by GitHub
parent 9332af1b46
commit 8d80981465
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 2605 additions and 1031 deletions

View File

@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `NewSplitDriver` for OTLP exporter that allows sending traces and metrics to different endpoints. (#1418)
- Add codeql worfklow to GitHub Actions (#1428)
- Added Gosec workflow to GitHub Actions (#1429)
- A new HTTP driver for OTLP exporter in `exporters/otlp/otlphttp`. Currently it only supports the binary protobuf payloads. (#1420)
### Changed
@ -29,6 +30,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Unify endpoint API that related to OTel exporter. (#1401)
- Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430)
- `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432)
- Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420)
### Removed

View File

@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
@ -49,10 +50,10 @@ func initProvider() func() {
// `localhost:30080` endpoint. Otherwise, replace `localhost` with the
// endpoint of your cluster. If you run the app inside k8s, then you can
// probably connect directly to the service through dns
driver := otlp.NewGRPCDriver(
otlp.WithInsecure(),
otlp.WithEndpoint("localhost:30080"),
otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing
driver := otlpgrpc.NewDriver(
otlpgrpc.WithInsecure(),
otlpgrpc.WithEndpoint("localhost:30080"),
otlpgrpc.WithDialOption(grpc.WithBlock()), // useful for testing
)
exp, err := otlp.NewExporter(ctx, driver)
handleErr(err, "failed to create exporter")

View File

@ -0,0 +1,137 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlptest
import (
"sort"
collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1"
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
)
// Collector is an interface that mock collectors should implements,
// so they can be used for the end-to-end testing.
type Collector interface {
Stop() error
GetResourceSpans() []*tracepb.ResourceSpans
GetMetrics() []*metricpb.Metric
}
// SpansStorage stores the spans. Mock collectors could use it to
// store spans they have received.
type SpansStorage struct {
rsm map[string]*tracepb.ResourceSpans
spanCount int
}
// MetricsStorage stores the metrics. Mock collectors could use it to
// store metrics they have received.
type MetricsStorage struct {
metrics []*metricpb.Metric
}
// NewSpansStorage creates a new spans storage.
func NewSpansStorage() SpansStorage {
return SpansStorage{
rsm: make(map[string]*tracepb.ResourceSpans),
}
}
// AddSpans adds spans to the spans storage.
func (s *SpansStorage) AddSpans(request *collectortracepb.ExportTraceServiceRequest) {
for _, rs := range request.GetResourceSpans() {
rstr := resourceString(rs.Resource)
if existingRs, ok := s.rsm[rstr]; !ok {
s.rsm[rstr] = rs
// TODO (rghetia): Add support for library Info.
if len(rs.InstrumentationLibrarySpans) == 0 {
rs.InstrumentationLibrarySpans = []*tracepb.InstrumentationLibrarySpans{
{
Spans: []*tracepb.Span{},
},
}
}
s.spanCount += len(rs.InstrumentationLibrarySpans[0].Spans)
} else {
if len(rs.InstrumentationLibrarySpans) > 0 {
newSpans := rs.InstrumentationLibrarySpans[0].GetSpans()
existingRs.InstrumentationLibrarySpans[0].Spans =
append(existingRs.InstrumentationLibrarySpans[0].Spans,
newSpans...)
s.spanCount += len(newSpans)
}
}
}
}
// GetSpans returns the stored spans.
func (s *SpansStorage) GetSpans() []*tracepb.Span {
spans := make([]*tracepb.Span, 0, s.spanCount)
for _, rs := range s.rsm {
spans = append(spans, rs.InstrumentationLibrarySpans[0].Spans...)
}
return spans
}
// GetResourceSpans returns the stored resource spans.
func (s *SpansStorage) GetResourceSpans() []*tracepb.ResourceSpans {
rss := make([]*tracepb.ResourceSpans, 0, len(s.rsm))
for _, rs := range s.rsm {
rss = append(rss, rs)
}
return rss
}
// NewMetricsStorage creates a new metrics storage.
func NewMetricsStorage() MetricsStorage {
return MetricsStorage{}
}
// AddMetrics adds metrics to the metrics storage.
func (s *MetricsStorage) AddMetrics(request *collectormetricpb.ExportMetricsServiceRequest) {
for _, rm := range request.GetResourceMetrics() {
// TODO (rghetia) handle multiple resource and library info.
if len(rm.InstrumentationLibraryMetrics) > 0 {
s.metrics = append(s.metrics, rm.InstrumentationLibraryMetrics[0].Metrics...)
}
}
}
// GetMetrics returns the stored metrics.
func (s *MetricsStorage) GetMetrics() []*metricpb.Metric {
// copy in order to not change.
m := make([]*metricpb.Metric, 0, len(s.metrics))
return append(m, s.metrics...)
}
func resourceString(res *resourcepb.Resource) string {
sAttrs := sortedAttributes(res.GetAttributes())
rstr := ""
for _, attr := range sAttrs {
rstr = rstr + attr.String()
}
return rstr
}
func sortedAttributes(attrs []*commonpb.KeyValue) []*commonpb.KeyValue {
sort.Slice(attrs[:], func(i, j int) bool {
return attrs[i].Key < attrs[j].Key
})
return attrs
}

View File

@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlptest
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
exportmetric "go.opentelemetry.io/otel/sdk/export/metric"
exporttrace "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
)
// Used to avoid implementing locking functions for test
// checkpointsets.
type noopLocker struct{}
// Lock implements sync.Locker, which is needed for
// exportmetric.CheckpointSet.
func (noopLocker) Lock() {}
// Unlock implements sync.Locker, which is needed for
// exportmetric.CheckpointSet.
func (noopLocker) Unlock() {}
// RLock implements exportmetric.CheckpointSet.
func (noopLocker) RLock() {}
// RUnlock implements exportmetric.CheckpointSet.
func (noopLocker) RUnlock() {}
// OneRecordCheckpointSet is a CheckpointSet that returns just one
// filled record. It may be useful for testing driver's metrics
// export.
type OneRecordCheckpointSet struct {
noopLocker
}
var _ exportmetric.CheckpointSet = OneRecordCheckpointSet{}
// ForEach implements exportmetric.CheckpointSet. It always invokes
// the callback once with always the same record.
func (OneRecordCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error {
desc := metric.NewDescriptor(
"foo",
metric.CounterInstrumentKind,
number.Int64Kind,
)
res := resource.NewWithAttributes(label.String("a", "b"))
agg := sum.New(1)
if err := agg[0].Update(context.Background(), number.NewInt64Number(42), &desc); err != nil {
return err
}
start := time.Date(2020, time.December, 8, 19, 15, 0, 0, time.UTC)
end := time.Date(2020, time.December, 8, 19, 16, 0, 0, time.UTC)
labels := label.NewSet(label.String("abc", "def"), label.Int64("one", 1))
rec := exportmetric.NewRecord(&desc, &labels, res, agg[0].Aggregation(), start, end)
return recordFunc(rec)
}
// SingleSpanSnapshot returns a one-element slice with a snapshot. It
// may be useful for testing driver's trace export.
func SingleSpanSnapshot() []*exporttrace.SpanSnapshot {
sd := &exporttrace.SpanSnapshot{
SpanContext: trace.SpanContext{
TraceID: trace.TraceID{2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9},
SpanID: trace.SpanID{3, 4, 5, 6, 7, 8, 9, 0},
TraceFlags: trace.FlagsSampled,
},
ParentSpanID: trace.SpanID{1, 2, 3, 4, 5, 6, 7, 8},
SpanKind: trace.SpanKindInternal,
Name: "foo",
StartTime: time.Date(2020, time.December, 8, 20, 23, 0, 0, time.UTC),
EndTime: time.Date(2020, time.December, 0, 20, 24, 0, 0, time.UTC),
Attributes: []label.KeyValue{},
MessageEvents: []exporttrace.Event{},
Links: []trace.Link{},
StatusCode: codes.Ok,
StatusMessage: "",
HasRemoteParent: false,
DroppedAttributeCount: 0,
DroppedMessageEventCount: 0,
DroppedLinkCount: 0,
ChildSpanCount: 0,
Resource: resource.NewWithAttributes(label.String("a", "b")),
InstrumentationLibrary: instrumentation.Library{
Name: "bar",
Version: "0.0.0",
},
}
return []*exporttrace.SpanSnapshot{sd}
}
// EmptyCheckpointSet is a checkpointer that has no records at all.
type EmptyCheckpointSet struct {
noopLocker
}
var _ exportmetric.CheckpointSet = EmptyCheckpointSet{}
// ForEach implements exportmetric.CheckpointSet. It never invokes the
// callback.
func (EmptyCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error {
return nil
}
// FailCheckpointSet is a checkpointer that returns an error during
// ForEach.
type FailCheckpointSet struct {
noopLocker
}
var _ exportmetric.CheckpointSet = FailCheckpointSet{}
// ForEach implements exportmetric.CheckpointSet. It always fails.
func (FailCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error {
return fmt.Errorf("fail")
}

View File

@ -0,0 +1,266 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlptest
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/exporters/otlp"
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
exportmetric "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
// RunEndToEndTest can be used by protocol driver tests to validate
// themselves.
func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlp.Exporter, mcTraces, mcMetrics Collector) {
pOpts := []sdktrace.TracerProviderOption{
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10),
),
}
tp1 := sdktrace.NewTracerProvider(append(pOpts,
sdktrace.WithResource(resource.NewWithAttributes(
label.String("rk1", "rv11)"),
label.Int64("rk2", 5),
)))...)
tp2 := sdktrace.NewTracerProvider(append(pOpts,
sdktrace.WithResource(resource.NewWithAttributes(
label.String("rk1", "rv12)"),
label.Float32("rk3", 6.5),
)))...)
tr1 := tp1.Tracer("test-tracer1")
tr2 := tp2.Tracer("test-tracer2")
// Now create few spans
m := 4
for i := 0; i < m; i++ {
_, span := tr1.Start(ctx, "AlwaysSample")
span.SetAttributes(label.Int64("i", int64(i)))
span.End()
_, span = tr2.Start(ctx, "AlwaysSample")
span.SetAttributes(label.Int64("i", int64(i)))
span.End()
}
selector := simple.NewWithInexpensiveDistribution()
processor := processor.New(selector, exportmetric.StatelessExportKindSelector())
pusher := push.New(processor, exp)
pusher.Start()
meter := pusher.MeterProvider().Meter("test-meter")
labels := []label.KeyValue{label.Bool("test", true)}
type data struct {
iKind metric.InstrumentKind
nKind number.Kind
val int64
}
instruments := map[string]data{
"test-int64-counter": {metric.CounterInstrumentKind, number.Int64Kind, 1},
"test-float64-counter": {metric.CounterInstrumentKind, number.Float64Kind, 1},
"test-int64-valuerecorder": {metric.ValueRecorderInstrumentKind, number.Int64Kind, 2},
"test-float64-valuerecorder": {metric.ValueRecorderInstrumentKind, number.Float64Kind, 2},
"test-int64-valueobserver": {metric.ValueObserverInstrumentKind, number.Int64Kind, 3},
"test-float64-valueobserver": {metric.ValueObserverInstrumentKind, number.Float64Kind, 3},
}
for name, data := range instruments {
data := data
switch data.iKind {
case metric.CounterInstrumentKind:
switch data.nKind {
case number.Int64Kind:
metric.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels...)
case number.Float64Kind:
metric.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels...)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
case metric.ValueRecorderInstrumentKind:
switch data.nKind {
case number.Int64Kind:
metric.Must(meter).NewInt64ValueRecorder(name).Record(ctx, data.val, labels...)
case number.Float64Kind:
metric.Must(meter).NewFloat64ValueRecorder(name).Record(ctx, float64(data.val), labels...)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
case metric.ValueObserverInstrumentKind:
switch data.nKind {
case number.Int64Kind:
metric.Must(meter).NewInt64ValueObserver(name,
func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(data.val, labels...)
},
)
case number.Float64Kind:
callback := func(v float64) metric.Float64ObserverFunc {
return metric.Float64ObserverFunc(func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(v, labels...) })
}(float64(data.val))
metric.Must(meter).NewFloat64ValueObserver(name, callback)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
default:
assert.Failf(t, "unsupported metrics testing kind", data.iKind.String())
}
}
// Flush and close.
pusher.Stop()
func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := tp1.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a tracer provider 1: %v", err)
}
if err := tp2.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a tracer provider 2: %v", err)
}
}()
// Wait >2 cycles.
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err)
}
// Shutdown the collector too so that we can begin
// verification checks of expected data back.
_ = mcTraces.Stop()
_ = mcMetrics.Stop()
// Now verify that we only got two resources
rss := mcTraces.GetResourceSpans()
if got, want := len(rss), 2; got != want {
t.Fatalf("resource span count: got %d, want %d\n", got, want)
}
// Now verify spans and attributes for each resource span.
for _, rs := range rss {
if len(rs.InstrumentationLibrarySpans) == 0 {
t.Fatalf("zero Instrumentation Library Spans")
}
if got, want := len(rs.InstrumentationLibrarySpans[0].Spans), m; got != want {
t.Fatalf("span counts: got %d, want %d", got, want)
}
attrMap := map[int64]bool{}
for _, s := range rs.InstrumentationLibrarySpans[0].Spans {
if gotName, want := s.Name, "AlwaysSample"; gotName != want {
t.Fatalf("span name: got %s, want %s", gotName, want)
}
attrMap[s.Attributes[0].Value.Value.(*commonpb.AnyValue_IntValue).IntValue] = true
}
if got, want := len(attrMap), m; got != want {
t.Fatalf("span attribute unique values: got %d want %d", got, want)
}
for i := 0; i < m; i++ {
_, ok := attrMap[int64(i)]
if !ok {
t.Fatalf("span with attribute %d missing", i)
}
}
}
metrics := mcMetrics.GetMetrics()
assert.Len(t, metrics, len(instruments), "not enough metrics exported")
seen := make(map[string]struct{}, len(instruments))
for _, m := range metrics {
data, ok := instruments[m.Name]
if !ok {
assert.Failf(t, "unknown metrics", m.Name)
continue
}
seen[m.Name] = struct{}{}
switch data.iKind {
case metric.CounterInstrumentKind:
switch data.nKind {
case number.Int64Kind:
if dp := m.GetIntSum().DataPoints; assert.Len(t, dp, 1) {
assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", m.Name)
}
case number.Float64Kind:
if dp := m.GetDoubleSum().DataPoints; assert.Len(t, dp, 1) {
assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", m.Name)
}
default:
assert.Failf(t, "invalid number kind", data.nKind.String())
}
case metric.ValueObserverInstrumentKind:
switch data.nKind {
case number.Int64Kind:
if dp := m.GetIntGauge().DataPoints; assert.Len(t, dp, 1) {
assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", m.Name)
}
case number.Float64Kind:
if dp := m.GetDoubleGauge().DataPoints; assert.Len(t, dp, 1) {
assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", m.Name)
}
default:
assert.Failf(t, "invalid number kind", data.nKind.String())
}
case metric.ValueRecorderInstrumentKind:
switch data.nKind {
case number.Int64Kind:
assert.NotNil(t, m.GetIntHistogram())
if dp := m.GetIntHistogram().DataPoints; assert.Len(t, dp, 1) {
count := dp[0].Count
assert.Equal(t, uint64(1), count, "invalid count for %q", m.Name)
assert.Equal(t, int64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", m.Name, data.val)
}
case number.Float64Kind:
assert.NotNil(t, m.GetDoubleHistogram())
if dp := m.GetDoubleHistogram().DataPoints; assert.Len(t, dp, 1) {
count := dp[0].Count
assert.Equal(t, uint64(1), count, "invalid count for %q", m.Name)
assert.Equal(t, float64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", m.Name, data.val)
}
default:
assert.Failf(t, "invalid number kind", data.nKind.String())
}
default:
assert.Failf(t, "invalid metrics kind", data.iKind.String())
}
}
for i := range instruments {
if _, ok := seen[i]; !ok {
assert.Fail(t, fmt.Sprintf("no metric(s) exported for %q", i))
}
}
}

View File

@ -21,7 +21,7 @@ import (
const (
// DefaultCollectorPort is the port the Exporter will attempt connect to
// if no collector port is provided.
DefaultCollectorPort uint16 = 55680
DefaultCollectorPort uint16 = 4317
// DefaultCollectorHost is the host address the Exporter will attempt
// connect to if no collector address is provided.
DefaultCollectorHost string = "localhost"

View File

@ -69,9 +69,7 @@ func NewUnstartedExporter(driver ProtocolDriver, opts ...ExporterOption) *Export
}
var (
errNoClient = errors.New("no client")
errAlreadyStarted = errors.New("already started")
errDisconnected = errors.New("exporter disconnected")
)
// Start establishes connections to the OpenTelemetry collector. Starting an

View File

@ -1,809 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp_test
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp"
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
exportmetric "go.opentelemetry.io/otel/sdk/export/metric"
exporttrace "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)
func TestNewExporter_endToEnd(t *testing.T) {
tests := []struct {
name string
additionalOpts []otlp.GRPCConnectionOption
}{
{
name: "StandardExporter",
},
{
name: "WithCompressor",
additionalOpts: []otlp.GRPCConnectionOption{
otlp.WithCompressor(gzip.Name),
},
},
{
name: "WithGRPCServiceConfig",
additionalOpts: []otlp.GRPCConnectionOption{
otlp.WithGRPCServiceConfig("{}"),
},
},
{
name: "WithGRPCDialOptions",
additionalOpts: []otlp.GRPCConnectionOption{
otlp.WithGRPCDialOption(grpc.WithBlock()),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
newExporterEndToEndTest(t, test.additionalOpts)
})
}
}
func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlp.GRPCConnectionOption) *otlp.Exporter {
opts := []otlp.GRPCConnectionOption{
otlp.WithInsecure(),
otlp.WithEndpoint(endpoint),
otlp.WithReconnectionPeriod(50 * time.Millisecond),
}
opts = append(opts, additionalOpts...)
driver := otlp.NewGRPCDriver(opts...)
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
return exp
}
func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCollector) {
pOpts := []sdktrace.TracerProviderOption{
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10),
),
}
tp1 := sdktrace.NewTracerProvider(append(pOpts,
sdktrace.WithResource(resource.NewWithAttributes(
label.String("rk1", "rv11)"),
label.Int64("rk2", 5),
)))...)
tp2 := sdktrace.NewTracerProvider(append(pOpts,
sdktrace.WithResource(resource.NewWithAttributes(
label.String("rk1", "rv12)"),
label.Float32("rk3", 6.5),
)))...)
tr1 := tp1.Tracer("test-tracer1")
tr2 := tp2.Tracer("test-tracer2")
// Now create few spans
m := 4
for i := 0; i < m; i++ {
_, span := tr1.Start(ctx, "AlwaysSample")
span.SetAttributes(label.Int64("i", int64(i)))
span.End()
_, span = tr2.Start(ctx, "AlwaysSample")
span.SetAttributes(label.Int64("i", int64(i)))
span.End()
}
selector := simple.NewWithInexpensiveDistribution()
processor := processor.New(selector, exportmetric.StatelessExportKindSelector())
pusher := push.New(processor, exp)
pusher.Start()
meter := pusher.MeterProvider().Meter("test-meter")
labels := []label.KeyValue{label.Bool("test", true)}
type data struct {
iKind metric.InstrumentKind
nKind number.Kind
val int64
}
instruments := map[string]data{
"test-int64-counter": {metric.CounterInstrumentKind, number.Int64Kind, 1},
"test-float64-counter": {metric.CounterInstrumentKind, number.Float64Kind, 1},
"test-int64-valuerecorder": {metric.ValueRecorderInstrumentKind, number.Int64Kind, 2},
"test-float64-valuerecorder": {metric.ValueRecorderInstrumentKind, number.Float64Kind, 2},
"test-int64-valueobserver": {metric.ValueObserverInstrumentKind, number.Int64Kind, 3},
"test-float64-valueobserver": {metric.ValueObserverInstrumentKind, number.Float64Kind, 3},
}
for name, data := range instruments {
data := data
switch data.iKind {
case metric.CounterInstrumentKind:
switch data.nKind {
case number.Int64Kind:
metric.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels...)
case number.Float64Kind:
metric.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels...)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
case metric.ValueRecorderInstrumentKind:
switch data.nKind {
case number.Int64Kind:
metric.Must(meter).NewInt64ValueRecorder(name).Record(ctx, data.val, labels...)
case number.Float64Kind:
metric.Must(meter).NewFloat64ValueRecorder(name).Record(ctx, float64(data.val), labels...)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
case metric.ValueObserverInstrumentKind:
switch data.nKind {
case number.Int64Kind:
metric.Must(meter).NewInt64ValueObserver(name,
func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(data.val, labels...)
},
)
case number.Float64Kind:
callback := func(v float64) metric.Float64ObserverFunc {
return metric.Float64ObserverFunc(func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(v, labels...) })
}(float64(data.val))
metric.Must(meter).NewFloat64ValueObserver(name, callback)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
default:
assert.Failf(t, "unsupported metrics testing kind", data.iKind.String())
}
}
// Flush and close.
pusher.Stop()
func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := tp1.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a tracer provider 1: %v", err)
}
if err := tp2.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a tracer provider 2: %v", err)
}
}()
// Wait >2 cycles.
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err)
}
// Shutdown the collector too so that we can begin
// verification checks of expected data back.
_ = mcTraces.stop()
_ = mcMetrics.stop()
// Now verify that we only got two resources
rss := mcTraces.getResourceSpans()
if got, want := len(rss), 2; got != want {
t.Fatalf("resource span count: got %d, want %d\n", got, want)
}
// Now verify spans and attributes for each resource span.
for _, rs := range rss {
if len(rs.InstrumentationLibrarySpans) == 0 {
t.Fatalf("zero Instrumentation Library Spans")
}
if got, want := len(rs.InstrumentationLibrarySpans[0].Spans), m; got != want {
t.Fatalf("span counts: got %d, want %d", got, want)
}
attrMap := map[int64]bool{}
for _, s := range rs.InstrumentationLibrarySpans[0].Spans {
if gotName, want := s.Name, "AlwaysSample"; gotName != want {
t.Fatalf("span name: got %s, want %s", gotName, want)
}
attrMap[s.Attributes[0].Value.Value.(*commonpb.AnyValue_IntValue).IntValue] = true
}
if got, want := len(attrMap), m; got != want {
t.Fatalf("span attribute unique values: got %d want %d", got, want)
}
for i := 0; i < m; i++ {
_, ok := attrMap[int64(i)]
if !ok {
t.Fatalf("span with attribute %d missing", i)
}
}
}
metrics := mcMetrics.getMetrics()
assert.Len(t, metrics, len(instruments), "not enough metrics exported")
seen := make(map[string]struct{}, len(instruments))
for _, m := range metrics {
data, ok := instruments[m.Name]
if !ok {
assert.Failf(t, "unknown metrics", m.Name)
continue
}
seen[m.Name] = struct{}{}
switch data.iKind {
case metric.CounterInstrumentKind:
switch data.nKind {
case number.Int64Kind:
if dp := m.GetIntSum().DataPoints; assert.Len(t, dp, 1) {
assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", m.Name)
}
case number.Float64Kind:
if dp := m.GetDoubleSum().DataPoints; assert.Len(t, dp, 1) {
assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", m.Name)
}
default:
assert.Failf(t, "invalid number kind", data.nKind.String())
}
case metric.ValueObserverInstrumentKind:
switch data.nKind {
case number.Int64Kind:
if dp := m.GetIntGauge().DataPoints; assert.Len(t, dp, 1) {
assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", m.Name)
}
case number.Float64Kind:
if dp := m.GetDoubleGauge().DataPoints; assert.Len(t, dp, 1) {
assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", m.Name)
}
default:
assert.Failf(t, "invalid number kind", data.nKind.String())
}
case metric.ValueRecorderInstrumentKind:
switch data.nKind {
case number.Int64Kind:
assert.NotNil(t, m.GetIntHistogram())
if dp := m.GetIntHistogram().DataPoints; assert.Len(t, dp, 1) {
count := dp[0].Count
assert.Equal(t, uint64(1), count, "invalid count for %q", m.Name)
assert.Equal(t, int64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", m.Name, data.val)
}
case number.Float64Kind:
assert.NotNil(t, m.GetDoubleHistogram())
if dp := m.GetDoubleHistogram().DataPoints; assert.Len(t, dp, 1) {
count := dp[0].Count
assert.Equal(t, uint64(1), count, "invalid count for %q", m.Name)
assert.Equal(t, float64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", m.Name, data.val)
}
default:
assert.Failf(t, "invalid number kind", data.nKind.String())
}
default:
assert.Failf(t, "invalid metrics kind", data.iKind.String())
}
}
for i := range instruments {
if _, ok := seen[i]; !ok {
assert.Fail(t, fmt.Sprintf("no metric(s) exported for %q", i))
}
}
}
func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...)
defer func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()
runEndToEndTest(t, ctx, exp, mc, mc)
}
func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()
// Invoke Start numerous times, should return errAlreadyStarted
for i := 0; i < 10; i++ {
if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") {
t.Fatalf("#%d unexpected Start error: %v", i, err)
}
}
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to Shutdown the exporter: %v", err)
}
// Invoke Shutdown numerous times
for i := 0; i < 10; i++ {
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf(`#%d got error (%v) expected none`, i, err)
}
}
}
func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)
reconnectionPeriod := 20 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlp.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()
// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()
// In the test below, we'll stop the collector many times,
// while exporting traces and test to ensure that we can
// reconnect.
for j := 0; j < 3; j++ {
// No endpoint up.
require.Error(
t,
exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)
// Now resurrect the collector by making a new one but reusing the
// old endpoint, and the collector should reconnect automatically.
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)
// Give the exporter sometime to reconnect
<-time.After(reconnectionPeriod * 4)
n := 10
for i := 0; i < n; i++ {
require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "Resurrected"}}))
}
nmaSpans := nmc.getSpans()
// Expecting 10 SpanSnapshots that were sampled, given that
if g, w := len(nmaSpans), n; g != w {
t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w)
}
dSpans := mc.getSpans()
// Expecting 0 spans to have been received by the original but now dead collector
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w)
}
_ = nmc.stop()
}
}
// This test takes a long time to run: to skip it, run tests using: -short
func TestNewExporter_collectorOnBadConnection(t *testing.T) {
if testing.Short() {
t.Skipf("Skipping this long running test")
}
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to grab an available port: %v", err)
}
// Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP
// However, our goal of closing it is to simulate an unavailable connection
_ = ln.Close()
_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
endpoint := fmt.Sprintf("localhost:%s", collectorPortStr)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, endpoint)
_ = exp.Shutdown(ctx)
}
func TestNewExporter_withEndpoint(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
_ = exp.Shutdown(ctx)
}
func TestNewExporter_withHeaders(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlp.WithHeaders(map[string]string{"header1": "value1"}))
require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}))
defer func() {
_ = exp.Shutdown(ctx)
}()
headers := mc.getHeaders()
require.Len(t, headers.Get("header1"), 1)
assert.Equal(t, "value1", headers.Get("header1")[0])
}
func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
_ = exp.Shutdown(ctx)
}()
tp := sdktrace.NewTracerProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() { _ = tp.Shutdown(ctx) }()
tr := tp.Tracer("test-tracer")
testKvs := []label.KeyValue{
label.Int("Int", 1),
label.Int32("Int32", int32(2)),
label.Int64("Int64", int64(3)),
label.Float32("Float32", float32(1.11)),
label.Float64("Float64", 2.22),
label.Bool("Bool", true),
label.String("String", "test"),
}
_, span := tr.Start(ctx, "AlwaysSample")
span.SetAttributes(testKvs...)
span.End()
// Flush and close.
func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a tracer provider: %v", err)
}
}()
// Wait >2 cycles.
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err)
}
// Shutdown the collector too so that we can begin
// verification checks of expected data back.
_ = mc.stop()
// Now verify that we only got one span
rss := mc.getSpans()
if got, want := len(rss), 1; got != want {
t.Fatalf("resource span count: got %d, want %d\n", got, want)
}
expected := []*commonpb.KeyValue{
{
Key: "Int",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_IntValue{
IntValue: 1,
},
},
},
{
Key: "Int32",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_IntValue{
IntValue: 2,
},
},
},
{
Key: "Int64",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_IntValue{
IntValue: 3,
},
},
},
{
Key: "Float32",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_DoubleValue{
DoubleValue: 1.11,
},
},
},
{
Key: "Float64",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_DoubleValue{
DoubleValue: 2.22,
},
},
},
{
Key: "Bool",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_BoolValue{
BoolValue: true,
},
},
},
{
Key: "String",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{
StringValue: "test",
},
},
},
}
// Verify attributes
if !assert.Len(t, rss[0].Attributes, len(expected)) {
t.Fatalf("attributes count: got %d, want %d\n", len(rss[0].Attributes), len(expected))
}
for i, actual := range rss[0].Attributes {
if a, ok := actual.Value.Value.(*commonpb.AnyValue_DoubleValue); ok {
e, ok := expected[i].Value.Value.(*commonpb.AnyValue_DoubleValue)
if !ok {
t.Errorf("expected AnyValue_DoubleValue, got %T", expected[i].Value.Value)
continue
}
if !assert.InDelta(t, e.DoubleValue, a.DoubleValue, 0.01) {
continue
}
e.DoubleValue = a.DoubleValue
}
assert.Equal(t, expected[i], actual)
}
}
type discCheckpointSet struct{}
func (discCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error {
desc := metric.NewDescriptor(
"foo",
metric.CounterInstrumentKind,
number.Int64Kind,
)
res := resource.NewWithAttributes(label.String("a", "b"))
agg := sum.New(1)
start := time.Now().Add(-20 * time.Minute)
end := time.Now()
labels := label.NewSet()
rec := exportmetric.NewRecord(&desc, &labels, res, agg[0].Aggregation(), start, end)
return recordFunc(rec)
}
func (discCheckpointSet) Lock() {}
func (discCheckpointSet) Unlock() {}
func (discCheckpointSet) RLock() {}
func (discCheckpointSet) RUnlock() {}
func discSpanSnapshot() *exporttrace.SpanSnapshot {
return &exporttrace.SpanSnapshot{
SpanContext: trace.SpanContext{
TraceID: trace.TraceID{2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9},
SpanID: trace.SpanID{3, 4, 5, 6, 7, 8, 9, 0},
TraceFlags: trace.FlagsSampled,
},
ParentSpanID: trace.SpanID{1, 2, 3, 4, 5, 6, 7, 8},
SpanKind: trace.SpanKindInternal,
Name: "foo",
StartTime: time.Now().Add(-20 * time.Minute),
EndTime: time.Now(),
Attributes: []label.KeyValue{},
MessageEvents: []exporttrace.Event{},
Links: []trace.Link{},
StatusCode: codes.Ok,
StatusMessage: "",
HasRemoteParent: false,
DroppedAttributeCount: 0,
DroppedMessageEventCount: 0,
DroppedLinkCount: 0,
ChildSpanCount: 0,
Resource: resource.NewWithAttributes(label.String("a", "b")),
InstrumentationLibrary: instrumentation.Library{
Name: "bar",
Version: "0.0.0",
},
}
}
func TestDisconnected(t *testing.T) {
ctx := context.Background()
// The endpoint is whatever, we want to be disconnected. But we
// setting a blocking connection, so dialing to the invalid
// endpoint actually fails.
exp := newGRPCExporter(t, ctx, "invalid",
otlp.WithReconnectionPeriod(time.Hour),
otlp.WithGRPCDialOption(
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true),
),
)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.Error(t, exp.Export(ctx, discCheckpointSet{}))
assert.Error(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{discSpanSnapshot()}))
}
type emptyCheckpointSet struct{}
func (emptyCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error {
return nil
}
func (emptyCheckpointSet) Lock() {}
func (emptyCheckpointSet) Unlock() {}
func (emptyCheckpointSet) RLock() {}
func (emptyCheckpointSet) RUnlock() {}
func TestEmptyData(t *testing.T) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.NoError(t, exp.ExportSpans(ctx, nil))
assert.NoError(t, exp.Export(ctx, emptyCheckpointSet{}))
}
type failCheckpointSet struct{}
func (failCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error {
return fmt.Errorf("fail")
}
func (failCheckpointSet) Lock() {}
func (failCheckpointSet) Unlock() {}
func (failCheckpointSet) RLock() {}
func (failCheckpointSet) RUnlock() {}
func TestFailedMetricTransform(t *testing.T) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.Error(t, exp.Export(ctx, failCheckpointSet{}))
}
func TestMultiConnectionDriver(t *testing.T) {
mcTraces := runMockCollector(t)
mcMetrics := runMockCollector(t)
defer func() {
_ = mcTraces.stop()
_ = mcMetrics.stop()
}()
<-time.After(5 * time.Millisecond)
commonOpts := []otlp.GRPCConnectionOption{
otlp.WithInsecure(),
otlp.WithReconnectionPeriod(50 * time.Millisecond),
otlp.WithGRPCDialOption(grpc.WithBlock()),
}
optsTraces := append([]otlp.GRPCConnectionOption{
otlp.WithEndpoint(mcTraces.endpoint),
}, commonOpts...)
optsMetrics := append([]otlp.GRPCConnectionOption{
otlp.WithEndpoint(mcMetrics.endpoint),
}, commonOpts...)
tracesDriver := otlp.NewGRPCDriver(optsTraces...)
metricsDriver := otlp.NewGRPCDriver(optsMetrics...)
splitCfg := otlp.SplitConfig{
ForMetrics: metricsDriver,
ForTraces: tracesDriver,
}
driver := otlp.NewSplitDriver(splitCfg)
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
runEndToEndTest(t, ctx, exp, mcTraces, mcMetrics)
}

View File

@ -28,11 +28,38 @@ import (
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)
func stubSpanSnapshot(count int) []*tracesdk.SpanSnapshot {
spans := make([]*tracesdk.SpanSnapshot, 0, count)
for i := 0; i < count; i++ {
spans = append(spans, new(tracesdk.SpanSnapshot))
}
return spans
}
type stubCheckpointSet struct {
limit int
}
var _ metricsdk.CheckpointSet = stubCheckpointSet{}
func (s stubCheckpointSet) ForEach(kindSelector metricsdk.ExportKindSelector, recordFunc func(metricsdk.Record) error) error {
for i := 0; i < s.limit; i++ {
if err := recordFunc(metricsdk.Record{}); err != nil {
return err
}
}
return nil
}
func (stubCheckpointSet) Lock() {}
func (stubCheckpointSet) Unlock() {}
func (stubCheckpointSet) RLock() {}
func (stubCheckpointSet) RUnlock() {}
type stubProtocolDriver struct {
started int
stopped int
@ -42,8 +69,8 @@ type stubProtocolDriver struct {
injectedStartError error
injectedStopError error
rm []metricpb.ResourceMetrics
rs []tracepb.ResourceSpans
rm []metricsdk.Record
rs []tracesdk.SpanSnapshot
}
var _ otlp.ProtocolDriver = (*stubProtocolDriver)(nil)
@ -70,6 +97,39 @@ func (m *stubProtocolDriver) Stop(ctx context.Context) error {
func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
m.metricsExported++
return cps.ForEach(selector, func(record metricsdk.Record) error {
m.rm = append(m.rm, record)
return nil
})
}
func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
m.tracesExported++
for _, rs := range ss {
if rs == nil {
continue
}
m.rs = append(m.rs, *rs)
}
return nil
}
type stubTransformingProtocolDriver struct {
rm []metricpb.ResourceMetrics
rs []tracepb.ResourceSpans
}
var _ otlp.ProtocolDriver = (*stubTransformingProtocolDriver)(nil)
func (m *stubTransformingProtocolDriver) Start(ctx context.Context) error {
return nil
}
func (m *stubTransformingProtocolDriver) Stop(ctx context.Context) error {
return nil
}
func (m *stubTransformingProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
rms, err := transform.CheckpointSet(parent, selector, cps, 1)
if err != nil {
return err
@ -83,8 +143,7 @@ func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk
return nil
}
func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
m.tracesExported++
func (m *stubTransformingProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
for _, rs := range transform.SpanData(ss) {
if rs == nil {
continue
@ -94,13 +153,13 @@ func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.Sp
return nil
}
func (m *stubProtocolDriver) Reset() {
func (m *stubTransformingProtocolDriver) Reset() {
m.rm = nil
m.rs = nil
}
func newExporter(t *testing.T, opts ...otlp.ExporterOption) (*otlp.Exporter, *stubProtocolDriver) {
driver := &stubProtocolDriver{}
func newExporter(t *testing.T, opts ...otlp.ExporterOption) (*otlp.Exporter, *stubTransformingProtocolDriver) {
driver := &stubTransformingProtocolDriver{}
exp, err := otlp.NewExporter(context.Background(), driver, opts...)
require.NoError(t, err)
return exp, driver
@ -204,11 +263,13 @@ func TestSplitDriver(t *testing.T) {
assert.Equal(t, 0, driverMetrics.tracesExported)
assert.Equal(t, 0, driverMetrics.metricsExported)
assert.NoError(t, driver.ExportMetrics(ctx, discCheckpointSet{}, metricsdk.StatelessExportKindSelector()))
assert.NoError(t, driver.ExportTraces(ctx, []*tracesdk.SpanSnapshot{discSpanSnapshot()}))
recordCount := 5
spanCount := 7
assert.NoError(t, driver.ExportMetrics(ctx, stubCheckpointSet{recordCount}, metricsdk.StatelessExportKindSelector()))
assert.NoError(t, driver.ExportTraces(ctx, stubSpanSnapshot(spanCount)))
assert.Len(t, driverTraces.rm, 0)
assert.Len(t, driverTraces.rs, 1)
assert.Len(t, driverMetrics.rm, 1)
assert.Len(t, driverTraces.rs, spanCount)
assert.Len(t, driverMetrics.rm, recordCount)
assert.Len(t, driverMetrics.rs, 0)
assert.Equal(t, 1, driverTraces.tracesExported)
assert.Equal(t, 0, driverTraces.metricsExported)

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp
package otlpgrpc
import (
"os"
@ -26,8 +26,8 @@ import (
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "grpcConnection.lastConnectErrPtr",
Offset: unsafe.Offsetof(grpcConnection{}.lastConnectErrPtr),
Name: "connection.lastConnectErrPtr",
Offset: unsafe.Offsetof(connection{}.lastConnectErrPtr),
},
}
if !ottest.Aligned8Byte(fields, os.Stderr) {

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
package otlpgrpc
import (
"context"
@ -26,7 +26,7 @@ import (
"google.golang.org/grpc/metadata"
)
type grpcConnection struct {
type connection struct {
// Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines.
lastConnectErrPtr unsafe.Pointer
@ -36,7 +36,7 @@ type grpcConnection struct {
cc *grpc.ClientConn
// these fields are read-only after constructor is finished
c grpcConnectionConfig
cfg config
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn)
@ -51,73 +51,73 @@ type grpcConnection struct {
closeBackgroundConnectionDoneCh func(ch chan struct{})
}
func newGRPCConnection(c grpcConnectionConfig, handler func(cc *grpc.ClientConn)) *grpcConnection {
conn := new(grpcConnection)
conn.newConnectionHandler = handler
conn.c = c
if len(conn.c.headers) > 0 {
conn.metadata = metadata.New(conn.c.headers)
func newConnection(cfg config, handler func(cc *grpc.ClientConn)) *connection {
c := new(connection)
c.newConnectionHandler = handler
c.cfg = cfg
if len(c.cfg.headers) > 0 {
c.metadata = metadata.New(c.cfg.headers)
}
conn.closeBackgroundConnectionDoneCh = func(ch chan struct{}) {
c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) {
close(ch)
}
return conn
return c
}
func (oc *grpcConnection) startConnection(ctx context.Context) {
oc.stopCh = make(chan struct{})
oc.disconnectedCh = make(chan bool)
oc.backgroundConnectionDoneCh = make(chan struct{})
func (c *connection) startConnection(ctx context.Context) {
c.stopCh = make(chan struct{})
c.disconnectedCh = make(chan bool)
c.backgroundConnectionDoneCh = make(chan struct{})
if err := oc.connect(ctx); err == nil {
oc.setStateConnected()
if err := c.connect(ctx); err == nil {
c.setStateConnected()
} else {
oc.setStateDisconnected(err)
c.setStateDisconnected(err)
}
go oc.indefiniteBackgroundConnection()
go c.indefiniteBackgroundConnection()
}
func (oc *grpcConnection) lastConnectError() error {
errPtr := (*error)(atomic.LoadPointer(&oc.lastConnectErrPtr))
func (c *connection) lastConnectError() error {
errPtr := (*error)(atomic.LoadPointer(&c.lastConnectErrPtr))
if errPtr == nil {
return nil
}
return *errPtr
}
func (oc *grpcConnection) saveLastConnectError(err error) {
func (c *connection) saveLastConnectError(err error) {
var errPtr *error
if err != nil {
errPtr = &err
}
atomic.StorePointer(&oc.lastConnectErrPtr, unsafe.Pointer(errPtr))
atomic.StorePointer(&c.lastConnectErrPtr, unsafe.Pointer(errPtr))
}
func (oc *grpcConnection) setStateDisconnected(err error) {
oc.saveLastConnectError(err)
func (c *connection) setStateDisconnected(err error) {
c.saveLastConnectError(err)
select {
case oc.disconnectedCh <- true:
case c.disconnectedCh <- true:
default:
}
oc.newConnectionHandler(nil)
c.newConnectionHandler(nil)
}
func (oc *grpcConnection) setStateConnected() {
oc.saveLastConnectError(nil)
func (c *connection) setStateConnected() {
c.saveLastConnectError(nil)
}
func (oc *grpcConnection) connected() bool {
return oc.lastConnectError() == nil
func (c *connection) connected() bool {
return c.lastConnectError() == nil
}
const defaultConnReattemptPeriod = 10 * time.Second
func (oc *grpcConnection) indefiniteBackgroundConnection() {
func (c *connection) indefiniteBackgroundConnection() {
defer func() {
oc.closeBackgroundConnectionDoneCh(oc.backgroundConnectionDoneCh)
c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh)
}()
connReattemptPeriod := oc.c.reconnectionPeriod
connReattemptPeriod := c.cfg.reconnectionPeriod
if connReattemptPeriod <= 0 {
connReattemptPeriod = defaultConnReattemptPeriod
}
@ -136,14 +136,14 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() {
// 2. Otherwise block until we are disconnected, and
// then retry connecting
select {
case <-oc.stopCh:
case <-c.stopCh:
return
case <-oc.disconnectedCh:
case <-c.disconnectedCh:
// Quickly check if we haven't stopped at the
// same time.
select {
case <-oc.stopCh:
case <-c.stopCh:
return
default:
@ -152,10 +152,10 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() {
// Normal scenario that we'll wait for
}
if err := oc.connect(context.Background()); err == nil {
oc.setStateConnected()
if err := c.connect(context.Background()); err == nil {
c.setStateConnected()
} else {
oc.setStateDisconnected(err)
c.setStateDisconnected(err)
}
// Apply some jitter to avoid lockstep retrials of other
@ -163,89 +163,89 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() {
// innocent DDOS, by clogging the machine's resources and network.
jitter := time.Duration(rng.Int63n(maxJitterNanos))
select {
case <-oc.stopCh:
case <-c.stopCh:
return
case <-time.After(connReattemptPeriod + jitter):
}
}
}
func (oc *grpcConnection) connect(ctx context.Context) error {
cc, err := oc.dialToCollector(ctx)
func (c *connection) connect(ctx context.Context) error {
cc, err := c.dialToCollector(ctx)
if err != nil {
return err
}
oc.setConnection(cc)
oc.newConnectionHandler(cc)
c.setConnection(cc)
c.newConnectionHandler(cc)
return nil
}
// setConnection sets cc as the client connection and returns true if
// the connection state changed.
func (oc *grpcConnection) setConnection(cc *grpc.ClientConn) bool {
oc.mu.Lock()
defer oc.mu.Unlock()
func (c *connection) setConnection(cc *grpc.ClientConn) bool {
c.mu.Lock()
defer c.mu.Unlock()
// If previous clientConn is same as the current then just return.
// This doesn't happen right now as this func is only called with new ClientConn.
// It is more about future-proofing.
if oc.cc == cc {
if c.cc == cc {
return false
}
// If the previous clientConn was non-nil, close it
if oc.cc != nil {
_ = oc.cc.Close()
if c.cc != nil {
_ = c.cc.Close()
}
oc.cc = cc
c.cc = cc
return true
}
func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) {
endpoint := oc.c.collectorEndpoint
func (c *connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) {
endpoint := c.cfg.collectorEndpoint
dialOpts := []grpc.DialOption{}
if oc.c.grpcServiceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(oc.c.grpcServiceConfig))
if c.cfg.serviceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.serviceConfig))
}
if oc.c.clientCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(oc.c.clientCredentials))
} else if oc.c.canDialInsecure {
if c.cfg.clientCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.cfg.clientCredentials))
} else if c.cfg.canDialInsecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if oc.c.compressor != "" {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(oc.c.compressor)))
if c.cfg.compressor != "" {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(c.cfg.compressor)))
}
if len(oc.c.grpcDialOptions) != 0 {
dialOpts = append(dialOpts, oc.c.grpcDialOptions...)
if len(c.cfg.dialOptions) != 0 {
dialOpts = append(dialOpts, c.cfg.dialOptions...)
}
ctx, cancel := oc.contextWithStop(ctx)
ctx, cancel := c.contextWithStop(ctx)
defer cancel()
ctx = oc.contextWithMetadata(ctx)
ctx = c.contextWithMetadata(ctx)
return grpc.DialContext(ctx, endpoint, dialOpts...)
}
func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Context {
if oc.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, oc.metadata)
func (c *connection) contextWithMetadata(ctx context.Context) context.Context {
if c.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, c.metadata)
}
return ctx
}
func (oc *grpcConnection) shutdown(ctx context.Context) error {
close(oc.stopCh)
func (c *connection) shutdown(ctx context.Context) error {
close(c.stopCh)
// Ensure that the backgroundConnector returns
select {
case <-oc.backgroundConnectionDoneCh:
case <-c.backgroundConnectionDoneCh:
case <-ctx.Done():
return ctx.Err()
}
oc.mu.Lock()
cc := oc.cc
oc.cc = nil
oc.mu.Unlock()
c.mu.Lock()
cc := c.cc
c.cc = nil
c.mu.Unlock()
if cc != nil {
return cc.Close()
@ -254,7 +254,7 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error {
return nil
}
func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
func (c *connection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the connection's
// stop channel.
ctx, cancel := context.WithCancel(ctx)
@ -263,7 +263,7 @@ func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context,
case <-ctx.Done():
// Nothing to do, either cancelled or deadline
// happened.
case <-oc.stopCh:
case <-c.stopCh:
cancel()
}
}(ctx, cancel)

View File

@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Package otlpgrpc provides an implementation of otlp.ProtocolDriver
that connects to the collector and sends traces and metrics using
gRPC.
This package is currently in a pre-GA phase. Backwards incompatible
changes may be introduced in subsequent minor version releases as we
work to track the evolving OpenTelemetry specification and user
feedback.
*/
package otlpgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"

View File

@ -12,15 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
package otlpgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
import (
"context"
"errors"
"fmt"
"sync"
"google.golang.org/grpc"
"go.opentelemetry.io/otel/exporters/otlp"
colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
@ -30,28 +32,34 @@ import (
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)
type grpcDriver struct {
connection *grpcConnection
type driver struct {
connection *connection
lock sync.Mutex
metricsClient colmetricpb.MetricsServiceClient
tracesClient coltracepb.TraceServiceClient
}
func NewGRPCDriver(opts ...GRPCConnectionOption) ProtocolDriver {
cfg := grpcConnectionConfig{
collectorEndpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort),
grpcServiceConfig: DefaultGRPCServiceConfig,
var (
errNoClient = errors.New("no client")
errDisconnected = errors.New("exporter disconnected")
)
// NewDriver creates a new gRPC protocol driver.
func NewDriver(opts ...Option) otlp.ProtocolDriver {
cfg := config{
collectorEndpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
serviceConfig: DefaultServiceConfig,
}
for _, opt := range opts {
opt(&cfg)
}
d := &grpcDriver{}
d.connection = newGRPCConnection(cfg, d.handleNewConnection)
d := &driver{}
d.connection = newConnection(cfg, d.handleNewConnection)
return d
}
func (d *grpcDriver) handleNewConnection(cc *grpc.ClientConn) {
func (d *driver) handleNewConnection(cc *grpc.ClientConn) {
d.lock.Lock()
defer d.lock.Unlock()
if cc != nil {
@ -63,16 +71,22 @@ func (d *grpcDriver) handleNewConnection(cc *grpc.ClientConn) {
}
}
func (d *grpcDriver) Start(ctx context.Context) error {
// Start implements otlp.ProtocolDriver. It establishes a connection
// to the collector.
func (d *driver) Start(ctx context.Context) error {
d.connection.startConnection(ctx)
return nil
}
func (d *grpcDriver) Stop(ctx context.Context) error {
// Stop implements otlp.ProtocolDriver. It shuts down the connection
// to the collector.
func (d *driver) Stop(ctx context.Context) error {
return d.connection.shutdown(ctx)
}
func (d *grpcDriver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
// ExportMetrics implements otlp.ProtocolDriver. It transforms metrics
// to protobuf binary format and sends the result to the collector.
func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
if !d.connection.connected() {
return errDisconnected
}
@ -90,7 +104,7 @@ func (d *grpcDriver) ExportMetrics(ctx context.Context, cps metricsdk.Checkpoint
return d.uploadMetrics(ctx, rms)
}
func (d *grpcDriver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error {
func (d *driver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error {
ctx = d.connection.contextWithMetadata(ctx)
err := func() error {
d.lock.Lock()
@ -109,7 +123,9 @@ func (d *grpcDriver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb
return err
}
func (d *grpcDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
// ExportTraces implements otlp.ProtocolDriver. It transforms spans to
// protobuf binary format and sends the result to the collector.
func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
if !d.connection.connected() {
return errDisconnected
}
@ -124,7 +140,7 @@ func (d *grpcDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapsh
return d.uploadTraces(ctx, protoSpans)
}
func (d *grpcDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
func (d *driver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
ctx = d.connection.contextWithMetadata(ctx)
err := func() error {
d.lock.Lock()

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp_test
package otlpgrpc_test
import (
"context"
@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
@ -33,7 +34,7 @@ import (
func Example_insecure() {
ctx := context.Background()
driver := otlp.NewGRPCDriver(otlp.WithInsecure())
driver := otlpgrpc.NewDriver(otlpgrpc.WithInsecure())
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
log.Fatalf("Failed to create the collector exporter: %v", err)
@ -86,7 +87,7 @@ func Example_withTLS() {
}
ctx := context.Background()
driver := otlp.NewGRPCDriver(otlp.WithTLSCredentials(creds))
driver := otlpgrpc.NewDriver(otlpgrpc.WithTLSCredentials(creds))
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
log.Fatalf("failed to create the collector exporter: %v", err)
@ -133,13 +134,13 @@ func Example_withTLS() {
func Example_withDifferentSignalCollectors() {
// Set different endpoints for the metrics and traces collectors
metricsDriver := otlp.NewGRPCDriver(
otlp.WithInsecure(),
otlp.WithEndpoint("localhost:30080"),
metricsDriver := otlpgrpc.NewDriver(
otlpgrpc.WithInsecure(),
otlpgrpc.WithEndpoint("localhost:30080"),
)
tracesDriver := otlp.NewGRPCDriver(
otlp.WithInsecure(),
otlp.WithEndpoint("localhost:30082"),
tracesDriver := otlpgrpc.NewDriver(
otlpgrpc.WithInsecure(),
otlpgrpc.WithEndpoint("localhost:30082"),
)
splitCfg := otlp.SplitConfig{
ForMetrics: metricsDriver,

View File

@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp_test
package otlpgrpc_test
import (
"context"
"fmt"
"net"
"sort"
"sync"
"testing"
"time"
@ -28,121 +27,73 @@ import (
collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1"
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlptest"
)
func makeMockCollector(t *testing.T) *mockCollector {
return &mockCollector{
t: t,
traceSvc: &mockTraceService{
rsm: map[string]*tracepb.ResourceSpans{},
storage: otlptest.NewSpansStorage(),
},
metricSvc: &mockMetricService{
storage: otlptest.NewMetricsStorage(),
},
metricSvc: &mockMetricService{},
}
}
type mockTraceService struct {
mu sync.RWMutex
rsm map[string]*tracepb.ResourceSpans
storage otlptest.SpansStorage
headers metadata.MD
}
func (mts *mockTraceService) getHeaders() metadata.MD {
mts.mu.RLock()
defer mts.mu.RUnlock()
return mts.headers
}
func (mts *mockTraceService) getSpans() []*tracepb.Span {
mts.mu.RLock()
defer mts.mu.RUnlock()
spans := []*tracepb.Span{}
for _, rs := range mts.rsm {
spans = append(spans, rs.InstrumentationLibrarySpans[0].Spans...)
}
return spans
return mts.storage.GetSpans()
}
func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans {
mts.mu.RLock()
defer mts.mu.RUnlock()
rss := make([]*tracepb.ResourceSpans, 0, len(mts.rsm))
for _, rs := range mts.rsm {
rss = append(rss, rs)
}
return rss
return mts.storage.GetResourceSpans()
}
func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) {
reply := &collectortracepb.ExportTraceServiceResponse{}
mts.mu.Lock()
mts.headers, _ = metadata.FromIncomingContext(ctx)
defer mts.mu.Unlock()
rss := exp.GetResourceSpans()
for _, rs := range rss {
rstr := resourceString(rs.Resource)
existingRs, ok := mts.rsm[rstr]
if !ok {
mts.rsm[rstr] = rs
// TODO (rghetia): Add support for library Info.
if len(rs.InstrumentationLibrarySpans) == 0 {
rs.InstrumentationLibrarySpans = []*tracepb.InstrumentationLibrarySpans{
{
Spans: []*tracepb.Span{},
},
}
}
} else {
if len(rs.InstrumentationLibrarySpans) > 0 {
existingRs.InstrumentationLibrarySpans[0].Spans =
append(existingRs.InstrumentationLibrarySpans[0].Spans,
rs.InstrumentationLibrarySpans[0].GetSpans()...)
}
}
}
return &collectortracepb.ExportTraceServiceResponse{}, nil
}
func resourceString(res *resourcepb.Resource) string {
sAttrs := sortedAttributes(res.GetAttributes())
rstr := ""
for _, attr := range sAttrs {
rstr = rstr + attr.String()
}
return rstr
}
func sortedAttributes(attrs []*commonpb.KeyValue) []*commonpb.KeyValue {
sort.Slice(attrs[:], func(i, j int) bool {
return attrs[i].Key < attrs[j].Key
})
return attrs
mts.headers, _ = metadata.FromIncomingContext(ctx)
mts.storage.AddSpans(exp)
return reply, nil
}
type mockMetricService struct {
mu sync.RWMutex
metrics []*metricpb.Metric
storage otlptest.MetricsStorage
}
func (mms *mockMetricService) getMetrics() []*metricpb.Metric {
// copy in order to not change.
m := make([]*metricpb.Metric, 0, len(mms.metrics))
mms.mu.RLock()
defer mms.mu.RUnlock()
return append(m, mms.metrics...)
return mms.storage.GetMetrics()
}
func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) {
reply := &collectormetricpb.ExportMetricsServiceResponse{}
mms.mu.Lock()
for _, rm := range exp.GetResourceMetrics() {
// TODO (rghetia) handle multiple resource and library info.
if len(rm.InstrumentationLibraryMetrics) > 0 {
mms.metrics = append(mms.metrics, rm.InstrumentationLibraryMetrics[0].Metrics...)
}
}
mms.mu.Unlock()
return &collectormetricpb.ExportMetricsServiceResponse{}, nil
defer mms.mu.Unlock()
mms.storage.AddMetrics(exp)
return reply, nil
}
type mockCollector struct {
@ -191,6 +142,10 @@ func (mc *mockCollector) stop() error {
return err
}
func (mc *mockCollector) Stop() error {
return mc.stop()
}
func (mc *mockCollector) getSpans() []*tracepb.Span {
return mc.traceSvc.getSpans()
}
@ -199,6 +154,10 @@ func (mc *mockCollector) getResourceSpans() []*tracepb.ResourceSpans {
return mc.traceSvc.getResourceSpans()
}
func (mc *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans {
return mc.getResourceSpans()
}
func (mc *mockCollector) getHeaders() metadata.MD {
return mc.traceSvc.getHeaders()
}
@ -207,6 +166,10 @@ func (mc *mockCollector) getMetrics() []*metricpb.Metric {
return mc.metricSvc.getMetrics()
}
func (mc *mockCollector) GetMetrics() []*metricpb.Metric {
return mc.getMetrics()
}
// runMockCollector is a helper function to create a mock Collector
func runMockCollector(t *testing.T) *mockCollector {
return runMockCollectorAtEndpoint(t, "localhost:0")

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
package otlpgrpc
import (
"time"
@ -22,7 +22,7 @@ import (
)
const (
// DefaultGRPCServiceConfig is the gRPC service config used if none is
// DefaultServiceConfig is the gRPC service config used if none is
// provided by the user.
//
// For more info on gRPC service configs:
@ -34,7 +34,7 @@ const (
// Note: MaxAttempts > 5 are treated as 5. See
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy
// for more details.
DefaultGRPCServiceConfig = `{
DefaultServiceConfig = `{
"methodConfig":[{
"name":[
{ "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" },
@ -46,7 +46,6 @@ const (
"MaxBackoff":"5s",
"BackoffMultiplier":2,
"RetryableStatusCodes":[
"UNAVAILABLE",
"CANCELLED",
"DEADLINE_EXCEEDED",
"RESOURCE_EXHAUSTED",
@ -60,24 +59,25 @@ const (
}`
)
type grpcConnectionConfig struct {
type config struct {
canDialInsecure bool
collectorEndpoint string
compressor string
reconnectionPeriod time.Duration
grpcServiceConfig string
grpcDialOptions []grpc.DialOption
serviceConfig string
dialOptions []grpc.DialOption
headers map[string]string
clientCredentials credentials.TransportCredentials
}
type GRPCConnectionOption func(cfg *grpcConnectionConfig)
// Option applies an option to the gRPC driver.
type Option func(cfg *config)
// WithInsecure disables client transport security for the exporter's gRPC connection
// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure
// does. Note, by default, client security is required unless WithInsecure is used.
func WithInsecure() GRPCConnectionOption {
return func(cfg *grpcConnectionConfig) {
func WithInsecure() Option {
return func(cfg *config) {
cfg.canDialInsecure = true
}
}
@ -85,16 +85,16 @@ func WithInsecure() GRPCConnectionOption {
// WithEndpoint allows one to set the endpoint that the exporter will
// connect to the collector on. If unset, it will instead try to use
// connect to DefaultCollectorHost:DefaultCollectorPort.
func WithEndpoint(endpoint string) GRPCConnectionOption {
return func(cfg *grpcConnectionConfig) {
func WithEndpoint(endpoint string) Option {
return func(cfg *config) {
cfg.collectorEndpoint = endpoint
}
}
// WithReconnectionPeriod allows one to set the delay between next connection attempt
// after failing to connect with the collector.
func WithReconnectionPeriod(rp time.Duration) GRPCConnectionOption {
return func(cfg *grpcConnectionConfig) {
func WithReconnectionPeriod(rp time.Duration) Option {
return func(cfg *config) {
cfg.reconnectionPeriod = rp
}
}
@ -104,15 +104,15 @@ func WithReconnectionPeriod(rp time.Duration) GRPCConnectionOption {
// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some
// compressors auto-register on import, such as gzip, which can be registered by calling
// `import _ "google.golang.org/grpc/encoding/gzip"`
func WithCompressor(compressor string) GRPCConnectionOption {
return func(cfg *grpcConnectionConfig) {
func WithCompressor(compressor string) Option {
return func(cfg *config) {
cfg.compressor = compressor
}
}
// WithHeaders will send the provided headers with gRPC requests
func WithHeaders(headers map[string]string) GRPCConnectionOption {
return func(cfg *grpcConnectionConfig) {
func WithHeaders(headers map[string]string) Option {
return func(cfg *config) {
cfg.headers = headers
}
}
@ -122,24 +122,24 @@ func WithHeaders(headers map[string]string) GRPCConnectionOption {
// of say a Certificate file or a tls.Certificate, because the retrieving
// these credentials can be done in many ways e.g. plain file, in code tls.Config
// or by certificate rotation, so it is up to the caller to decide what to use.
func WithTLSCredentials(creds credentials.TransportCredentials) GRPCConnectionOption {
return func(cfg *grpcConnectionConfig) {
func WithTLSCredentials(creds credentials.TransportCredentials) Option {
return func(cfg *config) {
cfg.clientCredentials = creds
}
}
// WithGRPCServiceConfig defines the default gRPC service config used.
func WithGRPCServiceConfig(serviceConfig string) GRPCConnectionOption {
return func(cfg *grpcConnectionConfig) {
cfg.grpcServiceConfig = serviceConfig
// WithServiceConfig defines the default gRPC service config used.
func WithServiceConfig(serviceConfig string) Option {
return func(cfg *config) {
cfg.serviceConfig = serviceConfig
}
}
// WithGRPCDialOption opens support to any grpc.DialOption to be used. If it conflicts
// WithDialOption opens support to any grpc.DialOption to be used. If it conflicts
// with some other configuration the GRPC specified via the collector the ones here will
// take preference since they are set last.
func WithGRPCDialOption(opts ...grpc.DialOption) GRPCConnectionOption {
return func(cfg *grpcConnectionConfig) {
cfg.grpcDialOptions = opts
func WithDialOption(opts ...grpc.DialOption) Option {
return func(cfg *config) {
cfg.dialOptions = opts
}
}

View File

@ -0,0 +1,498 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpgrpc_test
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"go.opentelemetry.io/otel/exporters/otlp"
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlptest"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/label"
exporttrace "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func TestNewExporter_endToEnd(t *testing.T) {
tests := []struct {
name string
additionalOpts []otlpgrpc.Option
}{
{
name: "StandardExporter",
},
{
name: "WithCompressor",
additionalOpts: []otlpgrpc.Option{
otlpgrpc.WithCompressor(gzip.Name),
},
},
{
name: "WithServiceConfig",
additionalOpts: []otlpgrpc.Option{
otlpgrpc.WithServiceConfig("{}"),
},
},
{
name: "WithDialOptions",
additionalOpts: []otlpgrpc.Option{
otlpgrpc.WithDialOption(grpc.WithBlock()),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
newExporterEndToEndTest(t, test.additionalOpts)
})
}
}
func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlpgrpc.Option) *otlp.Exporter {
opts := []otlpgrpc.Option{
otlpgrpc.WithInsecure(),
otlpgrpc.WithEndpoint(endpoint),
otlpgrpc.WithReconnectionPeriod(50 * time.Millisecond),
}
opts = append(opts, additionalOpts...)
driver := otlpgrpc.NewDriver(opts...)
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
return exp
}
func newExporterEndToEndTest(t *testing.T, additionalOpts []otlpgrpc.Option) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...)
defer func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()
otlptest.RunEndToEndTest(ctx, t, exp, mc, mc)
}
func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()
// Invoke Start numerous times, should return errAlreadyStarted
for i := 0; i < 10; i++ {
if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") {
t.Fatalf("#%d unexpected Start error: %v", i, err)
}
}
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to Shutdown the exporter: %v", err)
}
// Invoke Shutdown numerous times
for i := 0; i < 10; i++ {
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf(`#%d got error (%v) expected none`, i, err)
}
}
}
func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)
reconnectionPeriod := 20 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()
// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()
// In the test below, we'll stop the collector many times,
// while exporting traces and test to ensure that we can
// reconnect.
for j := 0; j < 3; j++ {
// No endpoint up.
require.Error(
t,
exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)
// Now resurrect the collector by making a new one but reusing the
// old endpoint, and the collector should reconnect automatically.
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)
// Give the exporter sometime to reconnect
<-time.After(reconnectionPeriod * 4)
n := 10
for i := 0; i < n; i++ {
require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "Resurrected"}}))
}
nmaSpans := nmc.getSpans()
// Expecting 10 SpanSnapshots that were sampled, given that
if g, w := len(nmaSpans), n; g != w {
t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w)
}
dSpans := mc.getSpans()
// Expecting 0 spans to have been received by the original but now dead collector
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w)
}
_ = nmc.stop()
}
}
// This test takes a long time to run: to skip it, run tests using: -short
func TestNewExporter_collectorOnBadConnection(t *testing.T) {
if testing.Short() {
t.Skipf("Skipping this long running test")
}
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to grab an available port: %v", err)
}
// Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP
// However, our goal of closing it is to simulate an unavailable connection
_ = ln.Close()
_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
endpoint := fmt.Sprintf("localhost:%s", collectorPortStr)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, endpoint)
_ = exp.Shutdown(ctx)
}
func TestNewExporter_withEndpoint(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
_ = exp.Shutdown(ctx)
}
func TestNewExporter_withHeaders(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithHeaders(map[string]string{"header1": "value1"}))
require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}))
defer func() {
_ = exp.Shutdown(ctx)
}()
headers := mc.getHeaders()
require.Len(t, headers.Get("header1"), 1)
assert.Equal(t, "value1", headers.Get("header1")[0])
}
func TestNewExporter_withMultipleAttributeTypes(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
_ = exp.Shutdown(ctx)
}()
tp := sdktrace.NewTracerProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() { _ = tp.Shutdown(ctx) }()
tr := tp.Tracer("test-tracer")
testKvs := []label.KeyValue{
label.Int("Int", 1),
label.Int32("Int32", int32(2)),
label.Int64("Int64", int64(3)),
label.Float32("Float32", float32(1.11)),
label.Float64("Float64", 2.22),
label.Bool("Bool", true),
label.String("String", "test"),
}
_, span := tr.Start(ctx, "AlwaysSample")
span.SetAttributes(testKvs...)
span.End()
// Flush and close.
func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a tracer provider: %v", err)
}
}()
// Wait >2 cycles.
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err)
}
// Shutdown the collector too so that we can begin
// verification checks of expected data back.
_ = mc.stop()
// Now verify that we only got one span
rss := mc.getSpans()
if got, want := len(rss), 1; got != want {
t.Fatalf("resource span count: got %d, want %d\n", got, want)
}
expected := []*commonpb.KeyValue{
{
Key: "Int",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_IntValue{
IntValue: 1,
},
},
},
{
Key: "Int32",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_IntValue{
IntValue: 2,
},
},
},
{
Key: "Int64",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_IntValue{
IntValue: 3,
},
},
},
{
Key: "Float32",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_DoubleValue{
DoubleValue: 1.11,
},
},
},
{
Key: "Float64",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_DoubleValue{
DoubleValue: 2.22,
},
},
},
{
Key: "Bool",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_BoolValue{
BoolValue: true,
},
},
},
{
Key: "String",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{
StringValue: "test",
},
},
},
}
// Verify attributes
if !assert.Len(t, rss[0].Attributes, len(expected)) {
t.Fatalf("attributes count: got %d, want %d\n", len(rss[0].Attributes), len(expected))
}
for i, actual := range rss[0].Attributes {
if a, ok := actual.Value.Value.(*commonpb.AnyValue_DoubleValue); ok {
e, ok := expected[i].Value.Value.(*commonpb.AnyValue_DoubleValue)
if !ok {
t.Errorf("expected AnyValue_DoubleValue, got %T", expected[i].Value.Value)
continue
}
if !assert.InDelta(t, e.DoubleValue, a.DoubleValue, 0.01) {
continue
}
e.DoubleValue = a.DoubleValue
}
assert.Equal(t, expected[i], actual)
}
}
func TestDisconnected(t *testing.T) {
ctx := context.Background()
// The endpoint is whatever, we want to be disconnected. But we
// setting a blocking connection, so dialing to the invalid
// endpoint actually fails.
exp := newGRPCExporter(t, ctx, "invalid",
otlpgrpc.WithReconnectionPeriod(time.Hour),
otlpgrpc.WithDialOption(
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true),
),
)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.Error(t, exp.Export(ctx, otlptest.OneRecordCheckpointSet{}))
assert.Error(t, exp.ExportSpans(ctx, otlptest.SingleSpanSnapshot()))
}
func TestEmptyData(t *testing.T) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.NoError(t, exp.ExportSpans(ctx, nil))
assert.NoError(t, exp.Export(ctx, otlptest.EmptyCheckpointSet{}))
}
func TestFailedMetricTransform(t *testing.T) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.Error(t, exp.Export(ctx, otlptest.FailCheckpointSet{}))
}
func TestMultiConnectionDriver(t *testing.T) {
mcTraces := runMockCollector(t)
mcMetrics := runMockCollector(t)
defer func() {
_ = mcTraces.stop()
_ = mcMetrics.stop()
}()
<-time.After(5 * time.Millisecond)
commonOpts := []otlpgrpc.Option{
otlpgrpc.WithInsecure(),
otlpgrpc.WithReconnectionPeriod(50 * time.Millisecond),
otlpgrpc.WithDialOption(grpc.WithBlock()),
}
optsTraces := append([]otlpgrpc.Option{
otlpgrpc.WithEndpoint(mcTraces.endpoint),
}, commonOpts...)
optsMetrics := append([]otlpgrpc.Option{
otlpgrpc.WithEndpoint(mcMetrics.endpoint),
}, commonOpts...)
tracesDriver := otlpgrpc.NewDriver(optsTraces...)
metricsDriver := otlpgrpc.NewDriver(optsMetrics...)
splitCfg := otlp.SplitConfig{
ForMetrics: metricsDriver,
ForTraces: tracesDriver,
}
driver := otlp.NewSplitDriver(splitCfg)
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
otlptest.RunEndToEndTest(ctx, t, exp, mcTraces, mcMetrics)
}

View File

@ -0,0 +1,92 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp_test
import (
"bytes"
"crypto/ecdsa"
"crypto/elliptic"
cryptorand "crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
mathrand "math/rand"
"net"
"time"
)
type mathRandReader struct{}
func (mathRandReader) Read(p []byte) (n int, err error) {
return mathrand.Read(p)
}
var randReader mathRandReader
type pemCertificate struct {
Certificate []byte
PrivateKey []byte
}
// Based on https://golang.org/src/crypto/tls/generate_cert.go,
// simplified and weakened.
func generateWeakCertificate() (*pemCertificate, error) {
priv, err := ecdsa.GenerateKey(elliptic.P256(), randReader)
if err != nil {
return nil, err
}
keyUsage := x509.KeyUsageDigitalSignature
notBefore := time.Now()
notAfter := notBefore.Add(time.Hour)
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, err := cryptorand.Int(randReader, serialNumberLimit)
if err != nil {
return nil, err
}
template := x509.Certificate{
SerialNumber: serialNumber,
Subject: pkix.Name{
Organization: []string{"otel-go"},
},
NotBefore: notBefore,
NotAfter: notAfter,
KeyUsage: keyUsage,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
DNSNames: []string{"localhost"},
IPAddresses: []net.IP{net.IPv6loopback, net.IPv4(127, 0, 0, 1)},
}
derBytes, err := x509.CreateCertificate(randReader, &template, &template, &priv.PublicKey, priv)
if err != nil {
return nil, err
}
certificateBuffer := new(bytes.Buffer)
if err := pem.Encode(certificateBuffer, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {
return nil, err
}
privDERBytes, err := x509.MarshalPKCS8PrivateKey(priv)
if err != nil {
return nil, err
}
privBuffer := new(bytes.Buffer)
if err := pem.Encode(privBuffer, &pem.Block{Type: "PRIVATE KEY", Bytes: privDERBytes}); err != nil {
return nil, err
}
return &pemCertificate{
Certificate: certificateBuffer.Bytes(),
PrivateKey: privBuffer.Bytes(),
}, nil
}

View File

@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Package otlphttp implements a protocol driver that sends traces and
metrics to the collector using HTTP with binary protobuf payloads.
This package is currently in a pre-GA phase. Backwards incompatible
changes may be introduced in subsequent minor version releases as we
work to track the evolving OpenTelemetry specification and user
feedback.
*/
package otlphttp // import "go.opentelemetry.io/otel/exporters/otlp/otlphttp"

View File

@ -0,0 +1,291 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"path"
"strings"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
colmetricspb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)
const contentType = "application/x-protobuf"
// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
// DefaultTransport is overwritten with some different implementation
// of http.RoundTripper or it's modified by other package.
var ourTransport *http.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
type driver struct {
client *http.Client
cfg config
stopCh chan struct{}
}
var _ otlp.ProtocolDriver = (*driver)(nil)
// NewDriver creates a new HTTP driver.
func NewDriver(opts ...Option) otlp.ProtocolDriver {
cfg := config{
endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
compression: NoCompression,
tracesURLPath: DefaultTracesPath,
metricsURLPath: DefaultMetricsPath,
maxAttempts: DefaultMaxAttempts,
backoff: DefaultBackoff,
}
for _, opt := range opts {
opt.Apply(&cfg)
}
for pathPtr, defaultPath := range map[*string]string{
&cfg.tracesURLPath: DefaultTracesPath,
&cfg.metricsURLPath: DefaultMetricsPath,
} {
tmp := strings.TrimSpace(*pathPtr)
if tmp == "" {
tmp = defaultPath
} else {
tmp = path.Clean(tmp)
if !path.IsAbs(tmp) {
tmp = fmt.Sprintf("/%s", tmp)
}
}
*pathPtr = tmp
}
if cfg.maxAttempts <= 0 {
cfg.maxAttempts = DefaultMaxAttempts
}
if cfg.maxAttempts > DefaultMaxAttempts {
cfg.maxAttempts = DefaultMaxAttempts
}
if cfg.backoff <= 0 {
cfg.backoff = DefaultBackoff
}
client := &http.Client{
Transport: ourTransport,
}
if cfg.tlsCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.tlsCfg
client.Transport = transport
}
return &driver{
client: client,
cfg: cfg,
stopCh: make(chan struct{}),
}
}
// Start implements otlp.ProtocolDriver.
func (d *driver) Start(ctx context.Context) error {
// nothing to do
return nil
}
// Stop implements otlp.ProtocolDriver.
func (d *driver) Stop(ctx context.Context) error {
close(d.stopCh)
return nil
}
// ExportMetrics implements otlp.ProtocolDriver.
func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
rms, err := transform.CheckpointSet(ctx, selector, cps, 1)
if err != nil {
return err
}
if len(rms) == 0 {
return nil
}
pbRequest := &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: rms,
}
rawRequest, err := pbRequest.Marshal()
if err != nil {
return err
}
return d.send(ctx, rawRequest, d.cfg.metricsURLPath)
}
// ExportTraces implements otlp.ProtocolDriver.
func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
protoSpans := transform.SpanData(ss)
if len(protoSpans) == 0 {
return nil
}
pbRequest := &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
}
rawRequest, err := pbRequest.Marshal()
if err != nil {
return err
}
return d.send(ctx, rawRequest, d.cfg.tracesURLPath)
}
func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath)
var cancel context.CancelFunc
ctx, cancel = d.contextWithStop(ctx)
defer cancel()
for i := 0; i < d.cfg.maxAttempts; i++ {
response, err := d.singleSend(ctx, rawRequest, address)
if err != nil {
return err
}
// We don't care about the body, so try to read it
// into /dev/null and close it immediately. The
// reading part is to facilitate connection reuse.
_, _ = io.Copy(ioutil.Discard, response.Body)
_ = response.Body.Close()
switch response.StatusCode {
case http.StatusOK:
return nil
case http.StatusTooManyRequests:
fallthrough
case http.StatusServiceUnavailable:
select {
case <-time.After(getWaitDuration(d.cfg.backoff, i)):
continue
case <-ctx.Done():
return ctx.Err()
}
default:
return fmt.Errorf("Failed with HTTP status %s", response.Status)
}
}
return fmt.Errorf("Failed to send data to %s after %d tries", address, d.cfg.maxAttempts)
}
func (d *driver) getScheme() string {
if d.cfg.insecure {
return "http"
}
return "https"
}
func getWaitDuration(backoff time.Duration, i int) time.Duration {
// Strategy: after nth failed attempt, attempt resending after
// k * initialBackoff + jitter, where k is a random number in
// range [0, 2^n-1), and jitter is a random percentage of
// initialBackoff from [-5%, 5%).
//
// Based on
// https://en.wikipedia.org/wiki/Exponential_backoff#Example_exponential_backoff_algorithm
//
// Jitter is our addition.
// There won't be an overflow, since i is capped to
// DefaultMaxAttempts (5).
upperK := (int64)(1) << (i + 1)
jitterPercent := (rand.Float64() - 0.5) / 10.
jitter := jitterPercent * (float64)(backoff)
k := rand.Int63n(upperK)
return (time.Duration)(k)*backoff + (time.Duration)(jitter)
}
func (d *driver) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the driver's stop
// channel.
ctx, cancel := context.WithCancel(ctx)
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
// Nothing to do, either cancelled or deadline
// happened.
case <-d.stopCh:
cancel()
}
}(ctx, cancel)
return ctx, cancel
}
func (d *driver) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil)
if err != nil {
return nil, err
}
bodyReader, contentLength, headers := d.prepareBody(rawRequest)
// Not closing bodyReader through defer, the HTTP Client's
// Transport will do it for us
request.Body = bodyReader
request.ContentLength = contentLength
for key, values := range headers {
for _, value := range values {
request.Header.Add(key, value)
}
}
return d.client.Do(request)
}
func (d *driver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
var bodyReader io.ReadCloser
headers := http.Header{}
for k, v := range d.cfg.headers {
headers.Set(k, v)
}
contentLength := (int64)(len(rawRequest))
headers.Set("Content-Type", contentType)
requestReader := bytes.NewBuffer(rawRequest)
switch d.cfg.compression {
case NoCompression:
bodyReader = ioutil.NopCloser(requestReader)
case GzipCompression:
preader, pwriter := io.Pipe()
go func() {
defer pwriter.Close()
gzipper := gzip.NewWriter(pwriter)
defer gzipper.Close()
_, err := io.Copy(gzipper, requestReader)
if err != nil {
otel.Handle(fmt.Errorf("otlphttp: failed to gzip request: %v", err))
}
}()
headers.Set("Content-Encoding", "gzip")
bodyReader = preader
contentLength = -1
}
return bodyReader, contentLength, headers
}

View File

@ -0,0 +1,418 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp_test
import (
"context"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlptest"
"go.opentelemetry.io/otel/exporters/otlp/otlphttp"
)
const (
relOtherMetricsPath = "post/metrics/here"
relOtherTracesPath = "post/traces/here"
otherMetricsPath = "/post/metrics/here"
otherTracesPath = "/post/traces/here"
)
var (
testHeaders = map[string]string{
"Otel-Go-Key-1": "somevalue",
"Otel-Go-Key-2": "someothervalue",
}
)
func TestEndToEnd(t *testing.T) {
tests := []struct {
name string
opts []otlphttp.Option
mcCfg mockCollectorConfig
tls bool
}{
{
name: "no extra options",
opts: nil,
},
{
name: "with gzip compression",
opts: []otlphttp.Option{
otlphttp.WithCompression(otlphttp.GzipCompression),
},
},
{
name: "with empty paths (forced to defaults)",
opts: []otlphttp.Option{
otlphttp.WithMetricsURLPath(""),
otlphttp.WithTracesURLPath(""),
},
},
{
name: "with different paths",
opts: []otlphttp.Option{
otlphttp.WithMetricsURLPath(otherMetricsPath),
otlphttp.WithTracesURLPath(otherTracesPath),
},
mcCfg: mockCollectorConfig{
MetricsURLPath: otherMetricsPath,
TracesURLPath: otherTracesPath,
},
},
{
name: "with relative paths",
opts: []otlphttp.Option{
otlphttp.WithMetricsURLPath(relOtherMetricsPath),
otlphttp.WithTracesURLPath(relOtherTracesPath),
},
mcCfg: mockCollectorConfig{
MetricsURLPath: otherMetricsPath,
TracesURLPath: otherTracesPath,
},
},
{
name: "with TLS",
opts: nil,
mcCfg: mockCollectorConfig{
WithTLS: true,
},
tls: true,
},
{
name: "with extra headers",
opts: []otlphttp.Option{
otlphttp.WithHeaders(testHeaders),
},
mcCfg: mockCollectorConfig{
ExpectedHeaders: testHeaders,
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mc := runMockCollector(t, tc.mcCfg)
defer mc.MustStop(t)
allOpts := []otlphttp.Option{
otlphttp.WithEndpoint(mc.Endpoint()),
}
if tc.tls {
tlsConfig := mc.ClientTLSConfig()
require.NotNil(t, tlsConfig)
allOpts = append(allOpts, otlphttp.WithTLSClientConfig(tlsConfig))
} else {
allOpts = append(allOpts, otlphttp.WithInsecure())
}
allOpts = append(allOpts, tc.opts...)
driver := otlphttp.NewDriver(allOpts...)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
if assert.NoError(t, err) {
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
otlptest.RunEndToEndTest(ctx, t, exporter, mc, mc)
}
})
}
}
func TestRetry(t *testing.T) {
statuses := []int{
http.StatusTooManyRequests,
http.StatusServiceUnavailable,
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithMaxAttempts(len(statuses)+1),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.NoError(t, err)
assert.Len(t, mc.GetSpans(), 1)
}
func TestRetryFailed(t *testing.T) {
statuses := []int{
http.StatusTooManyRequests,
http.StatusServiceUnavailable,
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithMaxAttempts(1),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
}
func TestNoRetry(t *testing.T) {
statuses := []int{
http.StatusBadRequest,
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithMaxAttempts(len(statuses)+1),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
}
func TestFailedCheckpoint(t *testing.T) {
mcCfg := mockCollectorConfig{}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.Export(ctx, otlptest.FailCheckpointSet{})
assert.Error(t, err)
assert.Empty(t, mc.GetMetrics())
}
func TestEmptyData(t *testing.T) {
mcCfg := mockCollectorConfig{}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.Export(ctx, otlptest.EmptyCheckpointSet{})
assert.NoError(t, err)
err = exporter.ExportSpans(ctx, nil)
assert.NoError(t, err)
assert.Empty(t, mc.GetMetrics())
assert.Empty(t, mc.GetSpans())
}
func TestUnreasonableMaxAttempts(t *testing.T) {
// Max attempts is 5, we set collector to fail 7 times and try
// to configure max attempts to be either negative or too
// large. Since we set max attempts to 5 in such cases,
// exporting to the collector should fail.
type testcase struct {
name string
maxAttempts int
}
for _, tc := range []testcase{
{
name: "negative max attempts",
maxAttempts: -3,
},
{
name: "too large max attempts",
maxAttempts: 10,
},
} {
t.Run(tc.name, func(t *testing.T) {
statuses := make([]int, 0, 7)
for i := 0; i < cap(statuses); i++ {
statuses = append(statuses, http.StatusTooManyRequests)
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithMaxAttempts(tc.maxAttempts),
otlphttp.WithBackoff(time.Millisecond),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
})
}
}
func TestUnreasonableBackoff(t *testing.T) {
// This sets backoff to negative value, which gets corrected
// to default backoff instead of being used. Default max
// attempts is 5, so we set the collector to fail 4 times, but
// we set the deadline to 3 times of the default backoff, so
// this should show that deadline is not met, meaning that the
// retries weren't immediate (as negative backoff could
// imply).
statuses := make([]int, 0, 4)
for i := 0; i < cap(statuses); i++ {
statuses = append(statuses, http.StatusTooManyRequests)
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithBackoff(-time.Millisecond),
)
ctx, cancel := context.WithTimeout(context.Background(), 3*otlphttp.DefaultBackoff)
defer cancel()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
}
func TestCancelledContext(t *testing.T) {
mcCfg := mockCollectorConfig{}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
)
ctx, cancel := context.WithCancel(context.Background())
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
cancel()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
}
func TestDeadlineContext(t *testing.T) {
statuses := make([]int, 0, 5)
for i := 0; i < cap(statuses); i++ {
statuses = append(statuses, http.StatusTooManyRequests)
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithBackoff(time.Minute),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
}
func TestStopWhileExporting(t *testing.T) {
statuses := make([]int, 0, 5)
for i := 0; i < cap(statuses); i++ {
statuses = append(statuses, http.StatusTooManyRequests)
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithBackoff(time.Minute),
)
ctx := context.Background()
exporter, err := otlp.NewExporter(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
doneCh := make(chan struct{})
go func() {
err := exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
close(doneCh)
}()
<-time.After(time.Second)
err = exporter.Shutdown(ctx)
assert.NoError(t, err)
<-doneCh
}

View File

@ -0,0 +1,272 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp_test
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlptest"
"go.opentelemetry.io/otel/exporters/otlp/otlphttp"
)
type mockCollector struct {
endpoint string
server *http.Server
spanLock sync.Mutex
spansStorage otlptest.SpansStorage
metricLock sync.Mutex
metricsStorage otlptest.MetricsStorage
injectHTTPStatus []int
injectContentType string
clientTLSConfig *tls.Config
expectedHeaders map[string]string
}
func (c *mockCollector) Stop() error {
return c.server.Shutdown(context.Background())
}
func (c *mockCollector) MustStop(t *testing.T) {
assert.NoError(t, c.server.Shutdown(context.Background()))
}
func (c *mockCollector) GetSpans() []*tracepb.Span {
c.spanLock.Lock()
defer c.spanLock.Unlock()
return c.spansStorage.GetSpans()
}
func (c *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans {
c.spanLock.Lock()
defer c.spanLock.Unlock()
return c.spansStorage.GetResourceSpans()
}
func (c *mockCollector) GetMetrics() []*metricpb.Metric {
c.metricLock.Lock()
defer c.metricLock.Unlock()
return c.metricsStorage.GetMetrics()
}
func (c *mockCollector) Endpoint() string {
return c.endpoint
}
func (c *mockCollector) ClientTLSConfig() *tls.Config {
return c.clientTLSConfig
}
func (c *mockCollector) serveMetrics(w http.ResponseWriter, r *http.Request) {
if !c.checkHeaders(r) {
w.WriteHeader(http.StatusBadRequest)
return
}
response := collectormetricpb.ExportMetricsServiceResponse{}
rawResponse, err := response.Marshal()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 {
writeReply(w, rawResponse, injectedStatus, c.injectContentType)
return
}
rawRequest, err := readRequest(r)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
request := collectormetricpb.ExportMetricsServiceRequest{}
if err := request.Unmarshal(rawRequest); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
c.metricLock.Lock()
defer c.metricLock.Unlock()
c.metricsStorage.AddMetrics(&request)
}
func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
if !c.checkHeaders(r) {
w.WriteHeader(http.StatusBadRequest)
return
}
response := collectortracepb.ExportTraceServiceResponse{}
rawResponse, err := response.Marshal()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 {
writeReply(w, rawResponse, injectedStatus, c.injectContentType)
return
}
rawRequest, err := readRequest(r)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
request := collectortracepb.ExportTraceServiceRequest{}
if err := request.Unmarshal(rawRequest); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
c.spanLock.Lock()
defer c.spanLock.Unlock()
c.spansStorage.AddSpans(&request)
}
func (c *mockCollector) checkHeaders(r *http.Request) bool {
for k, v := range c.expectedHeaders {
got := r.Header.Get(k)
if got != v {
return false
}
}
return true
}
func (c *mockCollector) getInjectHTTPStatus() int {
if len(c.injectHTTPStatus) == 0 {
return 0
}
status := c.injectHTTPStatus[0]
c.injectHTTPStatus = c.injectHTTPStatus[1:]
if len(c.injectHTTPStatus) == 0 {
c.injectHTTPStatus = nil
}
return status
}
func readRequest(r *http.Request) ([]byte, error) {
if r.Header.Get("Content-Encoding") == "gzip" {
return readGzipBody(r.Body)
}
return ioutil.ReadAll(r.Body)
}
func readGzipBody(body io.Reader) ([]byte, error) {
rawRequest := bytes.Buffer{}
gunzipper, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
defer gunzipper.Close()
_, err = io.Copy(&rawRequest, gunzipper)
if err != nil {
return nil, err
}
return rawRequest.Bytes(), nil
}
func writeReply(w http.ResponseWriter, rawResponse []byte, injectHTTPStatus int, injectContentType string) {
status := http.StatusOK
if injectHTTPStatus != 0 {
status = injectHTTPStatus
}
contentType := "application/x-protobuf"
if injectContentType != "" {
contentType = injectContentType
}
w.Header().Set("Content-Type", contentType)
w.WriteHeader(status)
_, _ = w.Write(rawResponse)
}
type mockCollectorConfig struct {
MetricsURLPath string
TracesURLPath string
Port int
InjectHTTPStatus []int
InjectContentType string
WithTLS bool
ExpectedHeaders map[string]string
}
func (c *mockCollectorConfig) fillInDefaults() {
if c.MetricsURLPath == "" {
c.MetricsURLPath = otlphttp.DefaultMetricsPath
}
if c.TracesURLPath == "" {
c.TracesURLPath = otlphttp.DefaultTracesPath
}
}
func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector {
cfg.fillInDefaults()
ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port))
require.NoError(t, err)
_, portStr, err := net.SplitHostPort(ln.Addr().String())
require.NoError(t, err)
m := &mockCollector{
endpoint: fmt.Sprintf("localhost:%s", portStr),
spansStorage: otlptest.NewSpansStorage(),
metricsStorage: otlptest.NewMetricsStorage(),
injectHTTPStatus: cfg.InjectHTTPStatus,
injectContentType: cfg.InjectContentType,
expectedHeaders: cfg.ExpectedHeaders,
}
mux := http.NewServeMux()
mux.Handle(cfg.MetricsURLPath, http.HandlerFunc(m.serveMetrics))
mux.Handle(cfg.TracesURLPath, http.HandlerFunc(m.serveTraces))
server := &http.Server{
Handler: mux,
}
if cfg.WithTLS {
pem, err := generateWeakCertificate()
require.NoError(t, err)
tlsCertificate, err := tls.X509KeyPair(pem.Certificate, pem.PrivateKey)
require.NoError(t, err)
server.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCertificate},
}
m.clientTLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
go func() {
if cfg.WithTLS {
_ = server.ServeTLS(ln, "", "")
} else {
_ = server.Serve(ln)
}
}()
m.server = server
return m
}

View File

@ -0,0 +1,180 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp
import (
"crypto/tls"
"time"
)
// Compression describes the compression used for payloads sent to the
// collector.
type Compression int
const (
// NoCompression tells the driver to send payloads without
// compression.
NoCompression Compression = iota
// GzipCompression tells the driver to send payloads after
// compressing them with gzip.
GzipCompression
)
const (
// DefaultMaxAttempts describes how many times the driver
// should retry the sending of the payload in case of a
// retryable error.
DefaultMaxAttempts int = 5
// DefaultTracesPath is a default URL path for endpoint that
// receives spans.
DefaultTracesPath string = "/v1/traces"
// DefaultMetricsPath is a default URL path for endpoint that
// receives metrics.
DefaultMetricsPath string = "/v1/metrics"
// DefaultBackoff is a default base backoff time used in the
// exponential backoff strategy.
DefaultBackoff time.Duration = 300 * time.Millisecond
)
type config struct {
endpoint string
compression Compression
tracesURLPath string
metricsURLPath string
maxAttempts int
backoff time.Duration
tlsCfg *tls.Config
insecure bool
headers map[string]string
}
// Option applies an option to the HTTP driver.
type Option interface {
Apply(*config)
}
type endpointOption string
func (o endpointOption) Apply(cfg *config) {
cfg.endpoint = (string)(o)
}
// WithEndpoint allows one to set the address of the collector
// endpoint that the driver will use to send metrics and spans. If
// unset, it will instead try to use
// DefaultCollectorHost:DefaultCollectorPort. Note that the endpoint
// must not contain any URL path.
func WithEndpoint(endpoint string) Option {
return (endpointOption)(endpoint)
}
type compressionOption Compression
func (o compressionOption) Apply(cfg *config) {
cfg.compression = (Compression)(o)
}
// WithCompression tells the driver to compress the sent data.
func WithCompression(compression Compression) Option {
return (compressionOption)(compression)
}
type tracesURLPathOption string
func (o tracesURLPathOption) Apply(cfg *config) {
cfg.tracesURLPath = (string)(o)
}
// WithTracesURLPath allows one to override the default URL path used
// for sending traces. If unset, DefaultTracesPath will be used.
func WithTracesURLPath(urlPath string) Option {
return (tracesURLPathOption)(urlPath)
}
type metricsURLPathOption string
func (o metricsURLPathOption) Apply(cfg *config) {
cfg.metricsURLPath = (string)(o)
}
// WithMetricsURLPath allows one to override the default URL path used
// for sending metrics. If unset, DefaultMetricsPath will be used.
func WithMetricsURLPath(urlPath string) Option {
return (metricsURLPathOption)(urlPath)
}
type maxAttemptsOption int
func (o maxAttemptsOption) Apply(cfg *config) {
cfg.maxAttempts = (int)(o)
}
// WithMaxAttempts allows one to override how many times the driver
// will try to send the payload in case of retryable errors. If unset,
// DefaultMaxAttempts will be used.
func WithMaxAttempts(maxAttempts int) Option {
return maxAttemptsOption(maxAttempts)
}
type backoffOption time.Duration
func (o backoffOption) Apply(cfg *config) {
cfg.backoff = (time.Duration)(o)
}
// WithBackoff tells the driver to use the duration as a base of the
// exponential backoff strategy. If unset, DefaultBackoff will be
// used.
func WithBackoff(duration time.Duration) Option {
return (backoffOption)(duration)
}
type tlsClientConfigOption tls.Config
func (o *tlsClientConfigOption) Apply(cfg *config) {
cfg.tlsCfg = (*tls.Config)(o)
}
// WithTLSClientConfig can be used to set up a custom TLS
// configuration for the client used to send payloads to the
// collector. Use it if you want to use a custom certificate.
func WithTLSClientConfig(tlsCfg *tls.Config) Option {
return (*tlsClientConfigOption)(tlsCfg)
}
type insecureOption struct{}
func (insecureOption) Apply(cfg *config) {
cfg.insecure = true
}
// WithInsecure tells the driver to connect to the collector using the
// HTTP scheme, instead of HTTPS.
func WithInsecure() Option {
return insecureOption{}
}
type headersOption map[string]string
func (o headersOption) Apply(cfg *config) {
cfg.headers = (map[string]string)(o)
}
// WithHeaders allows one to tell the driver to send additional HTTP
// headers with the payloads. Specifying headers like Content-Length,
// Content-Encoding and Content-Type may result in a broken driver.
func WithHeaders(headers map[string]string) Option {
return (headersOption)(headers)
}