2021-06-11 22:25:56 +02:00
|
|
|
// Copyright The OpenTelemetry Authors
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package otlpmetricgrpc_test
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"strings"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
2021-11-13 18:35:04 +02:00
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"google.golang.org/grpc"
|
2021-06-11 22:25:56 +02:00
|
|
|
"google.golang.org/grpc/codes"
|
2021-11-13 18:35:04 +02:00
|
|
|
"google.golang.org/grpc/encoding/gzip"
|
2021-06-11 22:25:56 +02:00
|
|
|
"google.golang.org/grpc/status"
|
|
|
|
|
2021-11-13 18:35:04 +02:00
|
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
|
|
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest"
|
|
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
|
|
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
2021-06-11 22:25:56 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
2021-09-27 17:51:47 +02:00
|
|
|
oneRecord = otlpmetrictest.OneRecordReader()
|
2021-08-13 00:44:58 +02:00
|
|
|
|
|
|
|
testResource = resource.Empty()
|
2021-06-11 22:25:56 +02:00
|
|
|
)
|
|
|
|
|
2022-04-15 17:47:09 +02:00
|
|
|
func TestNewExporterEndToEnd(t *testing.T) {
|
2021-06-11 22:25:56 +02:00
|
|
|
tests := []struct {
|
|
|
|
name string
|
|
|
|
additionalOpts []otlpmetricgrpc.Option
|
|
|
|
}{
|
|
|
|
{
|
|
|
|
name: "StandardExporter",
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "WithCompressor",
|
|
|
|
additionalOpts: []otlpmetricgrpc.Option{
|
|
|
|
otlpmetricgrpc.WithCompressor(gzip.Name),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "WithServiceConfig",
|
|
|
|
additionalOpts: []otlpmetricgrpc.Option{
|
|
|
|
otlpmetricgrpc.WithServiceConfig("{}"),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "WithDialOptions",
|
|
|
|
additionalOpts: []otlpmetricgrpc.Option{
|
|
|
|
otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, test := range tests {
|
|
|
|
t.Run(test.name, func(t *testing.T) {
|
|
|
|
newExporterEndToEndTest(t, test.additionalOpts)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlpmetricgrpc.Option) *otlpmetric.Exporter {
|
|
|
|
opts := []otlpmetricgrpc.Option{
|
|
|
|
otlpmetricgrpc.WithInsecure(),
|
|
|
|
otlpmetricgrpc.WithEndpoint(endpoint),
|
|
|
|
otlpmetricgrpc.WithReconnectionPeriod(50 * time.Millisecond),
|
|
|
|
}
|
|
|
|
|
|
|
|
opts = append(opts, additionalOpts...)
|
|
|
|
client := otlpmetricgrpc.NewClient(opts...)
|
|
|
|
exp, err := otlpmetric.New(ctx, client)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("failed to create a new collector exporter: %v", err)
|
|
|
|
}
|
|
|
|
return exp
|
|
|
|
}
|
|
|
|
|
|
|
|
func newExporterEndToEndTest(t *testing.T, additionalOpts []otlpmetricgrpc.Option) {
|
2022-02-23 01:03:17 +02:00
|
|
|
mc := runMockCollector(t)
|
2021-06-11 22:25:56 +02:00
|
|
|
|
|
|
|
defer func() {
|
|
|
|
_ = mc.stop()
|
|
|
|
}()
|
|
|
|
|
|
|
|
<-time.After(5 * time.Millisecond)
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...)
|
|
|
|
defer func() {
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
|
|
defer cancel()
|
|
|
|
if err := exp.Shutdown(ctx); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
otlpmetrictest.RunEndToEndTest(ctx, t, exp, mc)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestExporterShutdown(t *testing.T) {
|
2022-02-23 01:03:17 +02:00
|
|
|
mc := runMockCollector(t)
|
2021-06-11 22:25:56 +02:00
|
|
|
defer func() {
|
|
|
|
_ = mc.Stop()
|
|
|
|
}()
|
|
|
|
|
|
|
|
<-time.After(5 * time.Millisecond)
|
|
|
|
|
|
|
|
otlpmetrictest.RunExporterShutdownTest(t, func() otlpmetric.Client {
|
|
|
|
return otlpmetricgrpc.NewClient(
|
|
|
|
otlpmetricgrpc.WithInsecure(),
|
|
|
|
otlpmetricgrpc.WithEndpoint(mc.endpoint),
|
|
|
|
otlpmetricgrpc.WithReconnectionPeriod(50*time.Millisecond),
|
|
|
|
)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-04-15 17:47:09 +02:00
|
|
|
func TestNewExporterInvokeStartThenStopManyTimes(t *testing.T) {
|
2021-06-11 22:25:56 +02:00
|
|
|
mc := runMockCollector(t)
|
|
|
|
defer func() {
|
|
|
|
_ = mc.stop()
|
|
|
|
}()
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
exp := newGRPCExporter(t, ctx, mc.endpoint)
|
|
|
|
defer func() {
|
|
|
|
if err := exp.Shutdown(ctx); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Invoke Start numerous times, should return errAlreadyStarted
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") {
|
|
|
|
t.Fatalf("#%d unexpected Start error: %v", i, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := exp.Shutdown(ctx); err != nil {
|
|
|
|
t.Fatalf("failed to Shutdown the exporter: %v", err)
|
|
|
|
}
|
|
|
|
// Invoke Shutdown numerous times
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
if err := exp.Shutdown(ctx); err != nil {
|
|
|
|
t.Fatalf(`#%d got error (%v) expected none`, i, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-25 22:22:49 +02:00
|
|
|
// This test takes a long time to run: to skip it, run tests using: -short.
|
2022-04-15 17:47:09 +02:00
|
|
|
func TestNewExporterCollectorOnBadConnection(t *testing.T) {
|
2021-06-11 22:25:56 +02:00
|
|
|
if testing.Short() {
|
|
|
|
t.Skipf("Skipping this long running test")
|
|
|
|
}
|
|
|
|
|
|
|
|
ln, err := net.Listen("tcp", "localhost:0")
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("Failed to grab an available port: %v", err)
|
|
|
|
}
|
|
|
|
// Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP
|
|
|
|
// However, our goal of closing it is to simulate an unavailable connection
|
|
|
|
_ = ln.Close()
|
|
|
|
|
|
|
|
_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
|
|
|
|
|
|
|
|
endpoint := fmt.Sprintf("localhost:%s", collectorPortStr)
|
|
|
|
ctx := context.Background()
|
|
|
|
exp := newGRPCExporter(t, ctx, endpoint)
|
|
|
|
_ = exp.Shutdown(ctx)
|
|
|
|
}
|
|
|
|
|
2022-04-15 17:47:09 +02:00
|
|
|
func TestNewExporterWithEndpoint(t *testing.T) {
|
2021-06-11 22:25:56 +02:00
|
|
|
mc := runMockCollector(t)
|
|
|
|
defer func() {
|
|
|
|
_ = mc.stop()
|
|
|
|
}()
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
exp := newGRPCExporter(t, ctx, mc.endpoint)
|
|
|
|
_ = exp.Shutdown(ctx)
|
|
|
|
}
|
|
|
|
|
2022-04-15 17:47:09 +02:00
|
|
|
func TestNewExporterWithHeaders(t *testing.T) {
|
2021-06-11 22:25:56 +02:00
|
|
|
mc := runMockCollector(t)
|
|
|
|
defer func() {
|
|
|
|
_ = mc.stop()
|
|
|
|
}()
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
exp := newGRPCExporter(t, ctx, mc.endpoint,
|
|
|
|
otlpmetricgrpc.WithHeaders(map[string]string{"header1": "value1"}))
|
2021-08-13 00:44:58 +02:00
|
|
|
require.NoError(t, exp.Export(ctx, testResource, oneRecord))
|
2021-06-11 22:25:56 +02:00
|
|
|
|
|
|
|
defer func() {
|
|
|
|
_ = exp.Shutdown(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
headers := mc.getHeaders()
|
|
|
|
require.Len(t, headers.Get("header1"), 1)
|
|
|
|
assert.Equal(t, "value1", headers.Get("header1")[0])
|
|
|
|
}
|
|
|
|
|
2022-04-15 17:47:09 +02:00
|
|
|
func TestNewExporterWithTimeout(t *testing.T) {
|
2021-06-11 22:25:56 +02:00
|
|
|
tts := []struct {
|
|
|
|
name string
|
|
|
|
fn func(exp *otlpmetric.Exporter) error
|
|
|
|
timeout time.Duration
|
|
|
|
metrics int
|
|
|
|
spans int
|
|
|
|
code codes.Code
|
|
|
|
delay bool
|
|
|
|
}{
|
|
|
|
{
|
|
|
|
name: "Timeout Metrics",
|
|
|
|
fn: func(exp *otlpmetric.Exporter) error {
|
2021-08-13 00:44:58 +02:00
|
|
|
return exp.Export(context.Background(), testResource, oneRecord)
|
2021-06-11 22:25:56 +02:00
|
|
|
},
|
|
|
|
timeout: time.Millisecond * 100,
|
|
|
|
code: codes.DeadlineExceeded,
|
|
|
|
delay: true,
|
|
|
|
},
|
|
|
|
|
|
|
|
{
|
|
|
|
name: "No Timeout Metrics",
|
|
|
|
fn: func(exp *otlpmetric.Exporter) error {
|
2021-08-13 00:44:58 +02:00
|
|
|
return exp.Export(context.Background(), testResource, oneRecord)
|
2021-06-11 22:25:56 +02:00
|
|
|
},
|
|
|
|
timeout: time.Minute,
|
|
|
|
metrics: 1,
|
|
|
|
code: codes.OK,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, tt := range tts {
|
|
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
|
|
mc := runMockCollector(t)
|
|
|
|
if tt.delay {
|
|
|
|
mc.metricSvc.delay = time.Second * 10
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
_ = mc.stop()
|
|
|
|
}()
|
|
|
|
|
|
|
|
ctx := context.Background()
|
2021-12-03 23:59:07 +02:00
|
|
|
exp := newGRPCExporter(t, ctx, mc.endpoint, otlpmetricgrpc.WithTimeout(tt.timeout), otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{Enabled: false}))
|
2021-06-11 22:25:56 +02:00
|
|
|
defer func() {
|
|
|
|
_ = exp.Shutdown(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
err := tt.fn(exp)
|
|
|
|
|
|
|
|
if tt.code == codes.OK {
|
|
|
|
require.NoError(t, err)
|
|
|
|
} else {
|
|
|
|
require.Error(t, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
s := status.Convert(err)
|
|
|
|
require.Equal(t, tt.code, s.Code())
|
|
|
|
|
|
|
|
require.Len(t, mc.getMetrics(), tt.metrics)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-03 23:59:07 +02:00
|
|
|
func TestStartErrorInvalidAddress(t *testing.T) {
|
|
|
|
client := otlpmetricgrpc.NewClient(
|
|
|
|
otlpmetricgrpc.WithInsecure(),
|
|
|
|
// Validate the connection in Start (which should return the error).
|
2021-06-11 22:25:56 +02:00
|
|
|
otlpmetricgrpc.WithDialOption(
|
|
|
|
grpc.WithBlock(),
|
|
|
|
grpc.FailOnNonTempDialError(true),
|
|
|
|
),
|
2021-12-03 23:59:07 +02:00
|
|
|
otlpmetricgrpc.WithEndpoint("invalid"),
|
|
|
|
otlpmetricgrpc.WithReconnectionPeriod(time.Hour),
|
2021-06-11 22:25:56 +02:00
|
|
|
)
|
2021-12-03 23:59:07 +02:00
|
|
|
err := client.Start(context.Background())
|
|
|
|
assert.EqualError(t, err, `connection error: desc = "transport: error while dialing: dial tcp: address invalid: missing port in address"`)
|
2021-06-11 22:25:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func TestEmptyData(t *testing.T) {
|
2022-02-23 01:03:17 +02:00
|
|
|
mc := runMockCollector(t)
|
2021-06-11 22:25:56 +02:00
|
|
|
|
|
|
|
defer func() {
|
|
|
|
_ = mc.stop()
|
|
|
|
}()
|
|
|
|
|
|
|
|
<-time.After(5 * time.Millisecond)
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
exp := newGRPCExporter(t, ctx, mc.endpoint)
|
|
|
|
defer func() {
|
|
|
|
assert.NoError(t, exp.Shutdown(ctx))
|
|
|
|
}()
|
|
|
|
|
2021-09-27 17:51:47 +02:00
|
|
|
assert.NoError(t, exp.Export(ctx, testResource, otlpmetrictest.EmptyReader()))
|
2021-06-11 22:25:56 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func TestFailedMetricTransform(t *testing.T) {
|
2022-02-23 01:03:17 +02:00
|
|
|
mc := runMockCollector(t)
|
2021-06-11 22:25:56 +02:00
|
|
|
|
|
|
|
defer func() {
|
|
|
|
_ = mc.stop()
|
|
|
|
}()
|
|
|
|
|
|
|
|
<-time.After(5 * time.Millisecond)
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
exp := newGRPCExporter(t, ctx, mc.endpoint)
|
|
|
|
defer func() {
|
|
|
|
assert.NoError(t, exp.Shutdown(ctx))
|
|
|
|
}()
|
|
|
|
|
2021-09-27 17:51:47 +02:00
|
|
|
assert.Error(t, exp.Export(ctx, testResource, otlpmetrictest.FailReader{}))
|
2021-06-11 22:25:56 +02:00
|
|
|
}
|