1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-26 03:52:03 +02:00
Gustavo Silva Paiva b33edaa552
OTLP metrics gRPC exporter (#1991)
* OTLP metrics gRPC exporter

* add newline

* address comments
2021-06-11 13:25:56 -07:00

739 lines
21 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpmetricgrpc_test
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)
var (
oneRecord = otlpmetrictest.OneRecordCheckpointSet{}
)
func TestNewExporter_endToEnd(t *testing.T) {
tests := []struct {
name string
additionalOpts []otlpmetricgrpc.Option
}{
{
name: "StandardExporter",
},
{
name: "WithCompressor",
additionalOpts: []otlpmetricgrpc.Option{
otlpmetricgrpc.WithCompressor(gzip.Name),
},
},
{
name: "WithServiceConfig",
additionalOpts: []otlpmetricgrpc.Option{
otlpmetricgrpc.WithServiceConfig("{}"),
},
},
{
name: "WithDialOptions",
additionalOpts: []otlpmetricgrpc.Option{
otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
newExporterEndToEndTest(t, test.additionalOpts)
})
}
}
func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlpmetricgrpc.Option) *otlpmetric.Exporter {
opts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithInsecure(),
otlpmetricgrpc.WithEndpoint(endpoint),
otlpmetricgrpc.WithReconnectionPeriod(50 * time.Millisecond),
}
opts = append(opts, additionalOpts...)
client := otlpmetricgrpc.NewClient(opts...)
exp, err := otlpmetric.New(ctx, client)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
return exp
}
func newExporterEndToEndTest(t *testing.T, additionalOpts []otlpmetricgrpc.Option) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...)
defer func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()
otlpmetrictest.RunEndToEndTest(ctx, t, exp, mc)
}
func TestExporterShutdown(t *testing.T) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.Stop()
}()
<-time.After(5 * time.Millisecond)
otlpmetrictest.RunExporterShutdownTest(t, func() otlpmetric.Client {
return otlpmetricgrpc.NewClient(
otlpmetricgrpc.WithInsecure(),
otlpmetricgrpc.WithEndpoint(mc.endpoint),
otlpmetricgrpc.WithReconnectionPeriod(50*time.Millisecond),
)
})
}
func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()
// Invoke Start numerous times, should return errAlreadyStarted
for i := 0; i < 10; i++ {
if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") {
t.Fatalf("#%d unexpected Start error: %v", i, err)
}
}
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to Shutdown the exporter: %v", err)
}
// Invoke Shutdown numerous times
for i := 0; i < 10; i++ {
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf(`#%d got error (%v) expected none`, i, err)
}
}
}
func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) {
mc := runMockCollector(t)
reconnectionPeriod := 20 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false}),
otlpmetricgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()
// Wait for a connection.
mc.ln.WaitForConn()
// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
require.NoError(t, mc.stop())
// first export, it will send disconnected message to the channel on export failure,
// trigger almost immediate reconnection
require.Error(t, exp.Export(ctx, oneRecord))
// second export, it will detect connection issue, change state of exporter to disconnected and
// send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel)
require.Error(t, exp.Export(ctx, oneRecord))
// as a result we have exporter in disconnected state waiting for disconnection message to reconnect
// resurrect collector
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)
// make sure reconnection loop hits beginning and goes back to waiting mode
// after hitting beginning of the loop it should reconnect
nmc.ln.WaitForConn()
n := 10
for i := 0; i < n; i++ {
// when disconnected exp.Export doesnt send disconnected messages again
// it just quits and return last connection error
require.NoError(t, exp.Export(ctx, oneRecord))
}
nmaMetrics := nmc.getMetrics()
if g, w := len(nmaMetrics), n; g != w {
t.Fatalf("Connected collector: metrics: got %d want %d", g, w)
}
dMetrics := mc.getMetrics()
// Expecting 0 metrics to have been received by the original but now dead collector
if g, w := len(dMetrics), 0; g != w {
t.Fatalf("Disconnected collector: spans: got %d want %d", g, w)
}
require.NoError(t, nmc.Stop())
}
func TestExporterExportFailureAndRecoveryModes(t *testing.T) {
tts := []struct {
name string
errors []error
rs otlpmetricgrpc.RetrySettings
fn func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector)
opts []otlpmetricgrpc.Option
}{
{
name: "Do not retry if succeeded",
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
require.NoError(t, exp.Export(ctx, oneRecord))
metrics := mc.getMetrics()
require.Len(t, metrics, 1)
require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 success request.")
},
},
{
name: "Do not retry if 'error' is ok",
errors: []error{
status.Error(codes.OK, ""),
},
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
require.NoError(t, exp.Export(ctx, oneRecord))
metrics := mc.getMetrics()
require.Len(t, metrics, 0)
require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 error OK request.")
},
},
{
name: "Fail three times and succeed",
rs: otlpmetricgrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: 300 * time.Millisecond,
InitialInterval: 2 * time.Millisecond,
MaxInterval: 10 * time.Millisecond,
},
errors: []error{
status.Error(codes.Unavailable, "backend under pressure"),
status.Error(codes.Unavailable, "backend under pressure"),
status.Error(codes.Unavailable, "backend under pressure"),
},
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
require.NoError(t, exp.Export(ctx, oneRecord))
metrics := mc.getMetrics()
require.Len(t, metrics, 1)
require.Equal(t, 4, mc.metricSvc.requests, "metric service must receive 3 failure requests and 1 success request.")
},
},
{
name: "Permanent error should not be retried",
rs: otlpmetricgrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: 300 * time.Millisecond,
InitialInterval: 2 * time.Millisecond,
MaxInterval: 10 * time.Millisecond,
},
errors: []error{
status.Error(codes.InvalidArgument, "invalid arguments"),
},
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
require.Error(t, exp.Export(ctx, oneRecord))
metric := mc.getMetrics()
require.Len(t, metric, 0)
require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 error requests.")
},
},
{
name: "Test all transient errors and succeed",
rs: otlpmetricgrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: 500 * time.Millisecond,
InitialInterval: 1 * time.Millisecond,
MaxInterval: 2 * time.Millisecond,
},
errors: []error{
status.Error(codes.Canceled, ""),
status.Error(codes.DeadlineExceeded, ""),
status.Error(codes.ResourceExhausted, ""),
status.Error(codes.Aborted, ""),
status.Error(codes.OutOfRange, ""),
status.Error(codes.Unavailable, ""),
status.Error(codes.DataLoss, ""),
},
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
require.NoError(t, exp.Export(ctx, oneRecord))
metrics := mc.getMetrics()
require.Len(t, metrics, 1)
require.Equal(t, 8, mc.metricSvc.requests, "metric service must receive 9 failure requests and 1 success request.")
},
},
{
name: "Retry should honor server throttling",
rs: otlpmetricgrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: time.Minute,
InitialInterval: time.Nanosecond,
MaxInterval: time.Nanosecond,
},
opts: []otlpmetricgrpc.Option{
otlpmetricgrpc.WithTimeout(time.Millisecond * 100),
},
errors: []error{
newThrottlingError(codes.ResourceExhausted, time.Second*30),
},
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
err := exp.Export(ctx, oneRecord)
require.Error(t, err)
require.Equal(t, "context deadline exceeded", err.Error())
metrics := mc.getMetrics()
require.Len(t, metrics, 0)
require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests and 1 success request.")
},
},
{
name: "Retry should fail if server throttling is higher than the MaxElapsedTime",
rs: otlpmetricgrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: time.Millisecond * 100,
InitialInterval: time.Nanosecond,
MaxInterval: time.Nanosecond,
},
errors: []error{
newThrottlingError(codes.ResourceExhausted, time.Minute),
},
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
err := exp.Export(ctx, oneRecord)
require.Error(t, err)
require.Equal(t, "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = ", err.Error())
metrics := mc.getMetrics()
require.Len(t, metrics, 0)
require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests and 1 success request.")
},
},
{
name: "Retry stops if takes too long",
rs: otlpmetricgrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: time.Millisecond * 100,
InitialInterval: time.Millisecond * 50,
MaxInterval: time.Millisecond * 50,
},
errors: []error{
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
},
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
err := exp.Export(ctx, oneRecord)
require.Error(t, err)
require.Equal(t, "max elapsed time expired: rpc error: code = Unavailable desc = unavailable", err.Error())
metrics := mc.getMetrics()
require.Len(t, metrics, 0)
require.LessOrEqual(t, 1, mc.metricSvc.requests, "metric service must receive at least 1 failure requests.")
},
},
{
name: "Disabled retry",
rs: otlpmetricgrpc.RetrySettings{
Enabled: false,
},
errors: []error{
status.Error(codes.Unavailable, "unavailable"),
},
fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) {
err := exp.Export(ctx, oneRecord)
require.Error(t, err)
require.Equal(t, "rpc error: code = Unavailable desc = unavailable", err.Error())
metrics := mc.getMetrics()
require.Len(t, metrics, 0)
require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests.")
},
},
}
for _, tt := range tts {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
mc := runMockCollectorWithConfig(t, &mockConfig{
errors: tt.errors,
})
opts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithRetry(tt.rs),
}
if len(tt.opts) != 0 {
opts = append(opts, tt.opts...)
}
exp := newGRPCExporter(t, ctx, mc.endpoint, opts...)
tt.fn(t, ctx, exp, mc)
require.NoError(t, mc.Stop())
require.NoError(t, exp.Shutdown(ctx))
})
}
}
func TestPermanentErrorsShouldNotBeRetried(t *testing.T) {
permanentErrors := []*status.Status{
status.New(codes.Unknown, "Unknown"),
status.New(codes.InvalidArgument, "InvalidArgument"),
status.New(codes.NotFound, "NotFound"),
status.New(codes.AlreadyExists, "AlreadyExists"),
status.New(codes.FailedPrecondition, "FailedPrecondition"),
status.New(codes.Unimplemented, "Unimplemented"),
status.New(codes.Internal, "Internal"),
status.New(codes.PermissionDenied, ""),
status.New(codes.Unauthenticated, ""),
}
for _, sts := range permanentErrors {
t.Run(sts.Code().String(), func(t *testing.T) {
ctx := context.Background()
mc := runMockCollectorWithConfig(t, &mockConfig{
errors: []error{sts.Err()},
})
exp := newGRPCExporter(t, ctx, mc.endpoint)
err := exp.Export(ctx, oneRecord)
require.Error(t, err)
require.Len(t, mc.getMetrics(), 0)
require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 permanent error requests.")
require.NoError(t, mc.Stop())
require.NoError(t, exp.Shutdown(ctx))
})
}
}
func newThrottlingError(code codes.Code, duration time.Duration) error {
s := status.New(code, "")
s, _ = s.WithDetails(&errdetails.RetryInfo{RetryDelay: durationpb.New(duration)})
return s.Err()
}
func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)
reconnectionPeriod := 50 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false}),
otlpmetricgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()
mc.ln.WaitForConn()
// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
require.NoError(t, mc.stop())
// In the test below, we'll stop the collector many times,
// while exporting metrics and test to ensure that we can
// reconnect.
for j := 0; j < 3; j++ {
// No endpoint up.
require.Error(t, exp.Export(ctx, oneRecord))
// Now resurrect the collector by making a new one but reusing the
// old endpoint, and the collector should reconnect automatically.
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)
// Give the exporter sometime to reconnect
nmc.ln.WaitForConn()
n := 10
for i := 0; i < n; i++ {
require.NoError(t, exp.Export(ctx, oneRecord))
}
nmaMetrics := nmc.getMetrics()
// Expecting 10 metrics that were sampled, given that
if g, w := len(nmaMetrics), n; g != w {
t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w)
}
dMetrics := mc.getMetrics()
// Expecting 0 metrics to have been received by the original but now dead collector
if g, w := len(dMetrics), 0; g != w {
t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w)
}
// Disconnect for the next try.
require.NoError(t, nmc.stop())
}
}
// This test takes a long time to run: to skip it, run tests using: -short
func TestNewExporter_collectorOnBadConnection(t *testing.T) {
if testing.Short() {
t.Skipf("Skipping this long running test")
}
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to grab an available port: %v", err)
}
// Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP
// However, our goal of closing it is to simulate an unavailable connection
_ = ln.Close()
_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
endpoint := fmt.Sprintf("localhost:%s", collectorPortStr)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, endpoint)
_ = exp.Shutdown(ctx)
}
func TestNewExporter_withEndpoint(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
_ = exp.Shutdown(ctx)
}
func TestNewExporter_withHeaders(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpmetricgrpc.WithHeaders(map[string]string{"header1": "value1"}))
require.NoError(t, exp.Export(ctx, oneRecord))
defer func() {
_ = exp.Shutdown(ctx)
}()
headers := mc.getHeaders()
require.Len(t, headers.Get("header1"), 1)
assert.Equal(t, "value1", headers.Get("header1")[0])
}
func TestNewExporter_WithTimeout(t *testing.T) {
tts := []struct {
name string
fn func(exp *otlpmetric.Exporter) error
timeout time.Duration
metrics int
spans int
code codes.Code
delay bool
}{
{
name: "Timeout Metrics",
fn: func(exp *otlpmetric.Exporter) error {
return exp.Export(context.Background(), oneRecord)
},
timeout: time.Millisecond * 100,
code: codes.DeadlineExceeded,
delay: true,
},
{
name: "No Timeout Metrics",
fn: func(exp *otlpmetric.Exporter) error {
return exp.Export(context.Background(), oneRecord)
},
timeout: time.Minute,
metrics: 1,
code: codes.OK,
},
}
for _, tt := range tts {
t.Run(tt.name, func(t *testing.T) {
mc := runMockCollector(t)
if tt.delay {
mc.metricSvc.delay = time.Second * 10
}
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint, otlpmetricgrpc.WithTimeout(tt.timeout), otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false}))
defer func() {
_ = exp.Shutdown(ctx)
}()
err := tt.fn(exp)
if tt.code == codes.OK {
require.NoError(t, err)
} else {
require.Error(t, err)
}
s := status.Convert(err)
require.Equal(t, tt.code, s.Code())
require.Len(t, mc.getMetrics(), tt.metrics)
})
}
}
func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) {
mc := runMockCollector(t)
defer func() {
_ = mc.stop()
}()
ctx := context.Background()
client := otlpmetricgrpc.NewClient(otlpmetricgrpc.WithEndpoint(mc.endpoint))
exp, err := otlpmetric.New(ctx, client)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
err = exp.Export(ctx, oneRecord)
expectedErr := fmt.Sprintf("metrics exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint)
require.Error(t, err)
require.Equal(t, expectedErr, err.Error())
defer func() {
_ = exp.Shutdown(ctx)
}()
}
func TestDisconnected(t *testing.T) {
ctx := context.Background()
// The endpoint is whatever, we want to be disconnected. But we
// setting a blocking connection, so dialing to the invalid
// endpoint actually fails.
exp := newGRPCExporter(t, ctx, "invalid",
otlpmetricgrpc.WithReconnectionPeriod(time.Hour),
otlpmetricgrpc.WithDialOption(
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true),
),
)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.Error(t, exp.Export(ctx, oneRecord))
}
func TestEmptyData(t *testing.T) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.NoError(t, exp.Export(ctx, otlpmetrictest.EmptyCheckpointSet{}))
}
func TestFailedMetricTransform(t *testing.T) {
mc := runMockCollectorAtEndpoint(t, "localhost:56561")
defer func() {
_ = mc.stop()
}()
<-time.After(5 * time.Millisecond)
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint)
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
assert.Error(t, exp.Export(ctx, otlpmetrictest.FailCheckpointSet{}))
}