2021-06-11 22:25:56 +02:00
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpmetricgrpc_test
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
2021-08-13 00:44:58 +02:00
"go.opentelemetry.io/otel/sdk/resource"
2021-06-11 22:25:56 +02:00
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)
var (
2021-09-27 17:51:47 +02:00
oneRecord = otlpmetrictest . OneRecordReader ( )
2021-08-13 00:44:58 +02:00
testResource = resource . Empty ( )
2021-06-11 22:25:56 +02:00
)
func TestNewExporter_endToEnd ( t * testing . T ) {
tests := [ ] struct {
name string
additionalOpts [ ] otlpmetricgrpc . Option
} {
{
name : "StandardExporter" ,
} ,
{
name : "WithCompressor" ,
additionalOpts : [ ] otlpmetricgrpc . Option {
otlpmetricgrpc . WithCompressor ( gzip . Name ) ,
} ,
} ,
{
name : "WithServiceConfig" ,
additionalOpts : [ ] otlpmetricgrpc . Option {
otlpmetricgrpc . WithServiceConfig ( "{}" ) ,
} ,
} ,
{
name : "WithDialOptions" ,
additionalOpts : [ ] otlpmetricgrpc . Option {
otlpmetricgrpc . WithDialOption ( grpc . WithBlock ( ) ) ,
} ,
} ,
}
for _ , test := range tests {
t . Run ( test . name , func ( t * testing . T ) {
newExporterEndToEndTest ( t , test . additionalOpts )
} )
}
}
func newGRPCExporter ( t * testing . T , ctx context . Context , endpoint string , additionalOpts ... otlpmetricgrpc . Option ) * otlpmetric . Exporter {
opts := [ ] otlpmetricgrpc . Option {
otlpmetricgrpc . WithInsecure ( ) ,
otlpmetricgrpc . WithEndpoint ( endpoint ) ,
otlpmetricgrpc . WithReconnectionPeriod ( 50 * time . Millisecond ) ,
}
opts = append ( opts , additionalOpts ... )
client := otlpmetricgrpc . NewClient ( opts ... )
exp , err := otlpmetric . New ( ctx , client )
if err != nil {
t . Fatalf ( "failed to create a new collector exporter: %v" , err )
}
return exp
}
func newExporterEndToEndTest ( t * testing . T , additionalOpts [ ] otlpmetricgrpc . Option ) {
mc := runMockCollectorAtEndpoint ( t , "localhost:56561" )
defer func ( ) {
_ = mc . stop ( )
} ( )
<- time . After ( 5 * time . Millisecond )
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint , additionalOpts ... )
defer func ( ) {
ctx , cancel := context . WithTimeout ( ctx , 10 * time . Second )
defer cancel ( )
if err := exp . Shutdown ( ctx ) ; err != nil {
panic ( err )
}
} ( )
otlpmetrictest . RunEndToEndTest ( ctx , t , exp , mc )
}
func TestExporterShutdown ( t * testing . T ) {
mc := runMockCollectorAtEndpoint ( t , "localhost:56561" )
defer func ( ) {
_ = mc . Stop ( )
} ( )
<- time . After ( 5 * time . Millisecond )
otlpmetrictest . RunExporterShutdownTest ( t , func ( ) otlpmetric . Client {
return otlpmetricgrpc . NewClient (
otlpmetricgrpc . WithInsecure ( ) ,
otlpmetricgrpc . WithEndpoint ( mc . endpoint ) ,
otlpmetricgrpc . WithReconnectionPeriod ( 50 * time . Millisecond ) ,
)
} )
}
func TestNewExporter_invokeStartThenStopManyTimes ( t * testing . T ) {
mc := runMockCollector ( t )
defer func ( ) {
_ = mc . stop ( )
} ( )
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint )
defer func ( ) {
if err := exp . Shutdown ( ctx ) ; err != nil {
panic ( err )
}
} ( )
// Invoke Start numerous times, should return errAlreadyStarted
for i := 0 ; i < 10 ; i ++ {
if err := exp . Start ( ctx ) ; err == nil || ! strings . Contains ( err . Error ( ) , "already started" ) {
t . Fatalf ( "#%d unexpected Start error: %v" , i , err )
}
}
if err := exp . Shutdown ( ctx ) ; err != nil {
t . Fatalf ( "failed to Shutdown the exporter: %v" , err )
}
// Invoke Shutdown numerous times
for i := 0 ; i < 10 ; i ++ {
if err := exp . Shutdown ( ctx ) ; err != nil {
t . Fatalf ( ` #%d got error (%v) expected none ` , i , err )
}
}
}
func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode ( t * testing . T ) {
2021-11-12 19:18:59 +02:00
// TODO: Fix this test #1527
t . Skip ( "This test is flaky and needs to be rewritten" )
2021-06-11 22:25:56 +02:00
mc := runMockCollector ( t )
reconnectionPeriod := 20 * time . Millisecond
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint ,
otlpmetricgrpc . WithRetry ( otlpmetricgrpc . RetrySettings { Enabled : false } ) ,
otlpmetricgrpc . WithReconnectionPeriod ( reconnectionPeriod ) )
defer func ( ) { require . NoError ( t , exp . Shutdown ( ctx ) ) } ( )
// Wait for a connection.
mc . ln . WaitForConn ( )
// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
require . NoError ( t , mc . stop ( ) )
// first export, it will send disconnected message to the channel on export failure,
// trigger almost immediate reconnection
2021-08-13 00:44:58 +02:00
require . Error ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
// second export, it will detect connection issue, change state of exporter to disconnected and
// send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel)
2021-08-13 00:44:58 +02:00
require . Error ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
// as a result we have exporter in disconnected state waiting for disconnection message to reconnect
// resurrect collector
nmc := runMockCollectorAtEndpoint ( t , mc . endpoint )
// make sure reconnection loop hits beginning and goes back to waiting mode
// after hitting beginning of the loop it should reconnect
nmc . ln . WaitForConn ( )
n := 10
for i := 0 ; i < n ; i ++ {
// when disconnected exp.Export doesnt send disconnected messages again
// it just quits and return last connection error
2021-08-13 00:44:58 +02:00
require . NoError ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
}
nmaMetrics := nmc . getMetrics ( )
if g , w := len ( nmaMetrics ) , n ; g != w {
t . Fatalf ( "Connected collector: metrics: got %d want %d" , g , w )
}
dMetrics := mc . getMetrics ( )
// Expecting 0 metrics to have been received by the original but now dead collector
if g , w := len ( dMetrics ) , 0 ; g != w {
t . Fatalf ( "Disconnected collector: spans: got %d want %d" , g , w )
}
require . NoError ( t , nmc . Stop ( ) )
}
func TestExporterExportFailureAndRecoveryModes ( t * testing . T ) {
tts := [ ] struct {
name string
errors [ ] error
rs otlpmetricgrpc . RetrySettings
fn func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector )
opts [ ] otlpmetricgrpc . Option
} {
{
name : "Do not retry if succeeded" ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
require . NoError ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
metrics := mc . getMetrics ( )
require . Len ( t , metrics , 1 )
require . Equal ( t , 1 , mc . metricSvc . requests , "metric service must receive 1 success request." )
} ,
} ,
{
name : "Do not retry if 'error' is ok" ,
errors : [ ] error {
status . Error ( codes . OK , "" ) ,
} ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
require . NoError ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
metrics := mc . getMetrics ( )
require . Len ( t , metrics , 0 )
require . Equal ( t , 1 , mc . metricSvc . requests , "metric service must receive 1 error OK request." )
} ,
} ,
{
name : "Fail three times and succeed" ,
rs : otlpmetricgrpc . RetrySettings {
Enabled : true ,
MaxElapsedTime : 300 * time . Millisecond ,
InitialInterval : 2 * time . Millisecond ,
MaxInterval : 10 * time . Millisecond ,
} ,
errors : [ ] error {
status . Error ( codes . Unavailable , "backend under pressure" ) ,
status . Error ( codes . Unavailable , "backend under pressure" ) ,
status . Error ( codes . Unavailable , "backend under pressure" ) ,
} ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
require . NoError ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
metrics := mc . getMetrics ( )
require . Len ( t , metrics , 1 )
require . Equal ( t , 4 , mc . metricSvc . requests , "metric service must receive 3 failure requests and 1 success request." )
} ,
} ,
{
name : "Permanent error should not be retried" ,
rs : otlpmetricgrpc . RetrySettings {
Enabled : true ,
MaxElapsedTime : 300 * time . Millisecond ,
InitialInterval : 2 * time . Millisecond ,
MaxInterval : 10 * time . Millisecond ,
} ,
errors : [ ] error {
status . Error ( codes . InvalidArgument , "invalid arguments" ) ,
} ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
require . Error ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
metric := mc . getMetrics ( )
require . Len ( t , metric , 0 )
require . Equal ( t , 1 , mc . metricSvc . requests , "metric service must receive 1 error requests." )
} ,
} ,
{
name : "Test all transient errors and succeed" ,
rs : otlpmetricgrpc . RetrySettings {
Enabled : true ,
MaxElapsedTime : 500 * time . Millisecond ,
InitialInterval : 1 * time . Millisecond ,
MaxInterval : 2 * time . Millisecond ,
} ,
errors : [ ] error {
status . Error ( codes . Canceled , "" ) ,
status . Error ( codes . DeadlineExceeded , "" ) ,
status . Error ( codes . ResourceExhausted , "" ) ,
status . Error ( codes . Aborted , "" ) ,
status . Error ( codes . OutOfRange , "" ) ,
status . Error ( codes . Unavailable , "" ) ,
status . Error ( codes . DataLoss , "" ) ,
} ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
require . NoError ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
metrics := mc . getMetrics ( )
require . Len ( t , metrics , 1 )
require . Equal ( t , 8 , mc . metricSvc . requests , "metric service must receive 9 failure requests and 1 success request." )
} ,
} ,
{
name : "Retry should honor server throttling" ,
rs : otlpmetricgrpc . RetrySettings {
Enabled : true ,
MaxElapsedTime : time . Minute ,
InitialInterval : time . Nanosecond ,
MaxInterval : time . Nanosecond ,
} ,
opts : [ ] otlpmetricgrpc . Option {
otlpmetricgrpc . WithTimeout ( time . Millisecond * 100 ) ,
} ,
errors : [ ] error {
newThrottlingError ( codes . ResourceExhausted , time . Second * 30 ) ,
} ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
err := exp . Export ( ctx , testResource , oneRecord )
2021-06-11 22:25:56 +02:00
require . Error ( t , err )
require . Equal ( t , "context deadline exceeded" , err . Error ( ) )
metrics := mc . getMetrics ( )
require . Len ( t , metrics , 0 )
require . Equal ( t , 1 , mc . metricSvc . requests , "metric service must receive 1 failure requests and 1 success request." )
} ,
} ,
{
name : "Retry should fail if server throttling is higher than the MaxElapsedTime" ,
rs : otlpmetricgrpc . RetrySettings {
Enabled : true ,
MaxElapsedTime : time . Millisecond * 100 ,
InitialInterval : time . Nanosecond ,
MaxInterval : time . Nanosecond ,
} ,
errors : [ ] error {
newThrottlingError ( codes . ResourceExhausted , time . Minute ) ,
} ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
err := exp . Export ( ctx , testResource , oneRecord )
2021-06-11 22:25:56 +02:00
require . Error ( t , err )
require . Equal ( t , "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = " , err . Error ( ) )
metrics := mc . getMetrics ( )
require . Len ( t , metrics , 0 )
require . Equal ( t , 1 , mc . metricSvc . requests , "metric service must receive 1 failure requests and 1 success request." )
} ,
} ,
{
name : "Retry stops if takes too long" ,
rs : otlpmetricgrpc . RetrySettings {
Enabled : true ,
MaxElapsedTime : time . Millisecond * 100 ,
InitialInterval : time . Millisecond * 50 ,
MaxInterval : time . Millisecond * 50 ,
} ,
errors : [ ] error {
status . Error ( codes . Unavailable , "unavailable" ) ,
status . Error ( codes . Unavailable , "unavailable" ) ,
status . Error ( codes . Unavailable , "unavailable" ) ,
status . Error ( codes . Unavailable , "unavailable" ) ,
status . Error ( codes . Unavailable , "unavailable" ) ,
status . Error ( codes . Unavailable , "unavailable" ) ,
} ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
err := exp . Export ( ctx , testResource , oneRecord )
2021-06-11 22:25:56 +02:00
require . Error ( t , err )
require . Equal ( t , "max elapsed time expired: rpc error: code = Unavailable desc = unavailable" , err . Error ( ) )
metrics := mc . getMetrics ( )
require . Len ( t , metrics , 0 )
require . LessOrEqual ( t , 1 , mc . metricSvc . requests , "metric service must receive at least 1 failure requests." )
} ,
} ,
{
name : "Disabled retry" ,
rs : otlpmetricgrpc . RetrySettings {
Enabled : false ,
} ,
errors : [ ] error {
status . Error ( codes . Unavailable , "unavailable" ) ,
} ,
fn : func ( t * testing . T , ctx context . Context , exp * otlpmetric . Exporter , mc * mockCollector ) {
2021-08-13 00:44:58 +02:00
err := exp . Export ( ctx , testResource , oneRecord )
2021-06-11 22:25:56 +02:00
require . Error ( t , err )
require . Equal ( t , "rpc error: code = Unavailable desc = unavailable" , err . Error ( ) )
metrics := mc . getMetrics ( )
require . Len ( t , metrics , 0 )
require . Equal ( t , 1 , mc . metricSvc . requests , "metric service must receive 1 failure requests." )
} ,
} ,
}
for _ , tt := range tts {
t . Run ( tt . name , func ( t * testing . T ) {
ctx := context . Background ( )
mc := runMockCollectorWithConfig ( t , & mockConfig {
errors : tt . errors ,
} )
opts := [ ] otlpmetricgrpc . Option {
otlpmetricgrpc . WithRetry ( tt . rs ) ,
}
if len ( tt . opts ) != 0 {
opts = append ( opts , tt . opts ... )
}
exp := newGRPCExporter ( t , ctx , mc . endpoint , opts ... )
tt . fn ( t , ctx , exp , mc )
require . NoError ( t , mc . Stop ( ) )
require . NoError ( t , exp . Shutdown ( ctx ) )
} )
}
}
func TestPermanentErrorsShouldNotBeRetried ( t * testing . T ) {
permanentErrors := [ ] * status . Status {
status . New ( codes . Unknown , "Unknown" ) ,
status . New ( codes . InvalidArgument , "InvalidArgument" ) ,
status . New ( codes . NotFound , "NotFound" ) ,
status . New ( codes . AlreadyExists , "AlreadyExists" ) ,
status . New ( codes . FailedPrecondition , "FailedPrecondition" ) ,
status . New ( codes . Unimplemented , "Unimplemented" ) ,
status . New ( codes . Internal , "Internal" ) ,
status . New ( codes . PermissionDenied , "" ) ,
status . New ( codes . Unauthenticated , "" ) ,
}
for _ , sts := range permanentErrors {
t . Run ( sts . Code ( ) . String ( ) , func ( t * testing . T ) {
ctx := context . Background ( )
mc := runMockCollectorWithConfig ( t , & mockConfig {
errors : [ ] error { sts . Err ( ) } ,
} )
exp := newGRPCExporter ( t , ctx , mc . endpoint )
2021-08-13 00:44:58 +02:00
err := exp . Export ( ctx , testResource , oneRecord )
2021-06-11 22:25:56 +02:00
require . Error ( t , err )
require . Len ( t , mc . getMetrics ( ) , 0 )
require . Equal ( t , 1 , mc . metricSvc . requests , "metric service must receive 1 permanent error requests." )
require . NoError ( t , mc . Stop ( ) )
require . NoError ( t , exp . Shutdown ( ctx ) )
} )
}
}
func newThrottlingError ( code codes . Code , duration time . Duration ) error {
s := status . New ( code , "" )
s , _ = s . WithDetails ( & errdetails . RetryInfo { RetryDelay : durationpb . New ( duration ) } )
return s . Err ( )
}
func TestNewExporter_collectorConnectionDiesThenReconnects ( t * testing . T ) {
2021-11-12 19:18:59 +02:00
// TODO: Fix this test #1527
t . Skip ( "This test is flaky and needs to be rewritten" )
2021-06-11 22:25:56 +02:00
mc := runMockCollector ( t )
reconnectionPeriod := 50 * time . Millisecond
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint ,
otlpmetricgrpc . WithRetry ( otlpmetricgrpc . RetrySettings { Enabled : false } ) ,
otlpmetricgrpc . WithReconnectionPeriod ( reconnectionPeriod ) )
defer func ( ) { require . NoError ( t , exp . Shutdown ( ctx ) ) } ( )
mc . ln . WaitForConn ( )
// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
require . NoError ( t , mc . stop ( ) )
// In the test below, we'll stop the collector many times,
// while exporting metrics and test to ensure that we can
// reconnect.
for j := 0 ; j < 3 ; j ++ {
// No endpoint up.
2021-08-13 00:44:58 +02:00
require . Error ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
// Now resurrect the collector by making a new one but reusing the
// old endpoint, and the collector should reconnect automatically.
nmc := runMockCollectorAtEndpoint ( t , mc . endpoint )
// Give the exporter sometime to reconnect
nmc . ln . WaitForConn ( )
n := 10
for i := 0 ; i < n ; i ++ {
2021-08-13 00:44:58 +02:00
require . NoError ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
}
nmaMetrics := nmc . getMetrics ( )
// Expecting 10 metrics that were sampled, given that
if g , w := len ( nmaMetrics ) , n ; g != w {
t . Fatalf ( "Round #%d: Connected collector: spans: got %d want %d" , j , g , w )
}
dMetrics := mc . getMetrics ( )
// Expecting 0 metrics to have been received by the original but now dead collector
if g , w := len ( dMetrics ) , 0 ; g != w {
t . Fatalf ( "Round #%d: Disconnected collector: spans: got %d want %d" , j , g , w )
}
// Disconnect for the next try.
require . NoError ( t , nmc . stop ( ) )
}
}
// This test takes a long time to run: to skip it, run tests using: -short
func TestNewExporter_collectorOnBadConnection ( t * testing . T ) {
if testing . Short ( ) {
t . Skipf ( "Skipping this long running test" )
}
ln , err := net . Listen ( "tcp" , "localhost:0" )
if err != nil {
t . Fatalf ( "Failed to grab an available port: %v" , err )
}
// Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP
// However, our goal of closing it is to simulate an unavailable connection
_ = ln . Close ( )
_ , collectorPortStr , _ := net . SplitHostPort ( ln . Addr ( ) . String ( ) )
endpoint := fmt . Sprintf ( "localhost:%s" , collectorPortStr )
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , endpoint )
_ = exp . Shutdown ( ctx )
}
func TestNewExporter_withEndpoint ( t * testing . T ) {
mc := runMockCollector ( t )
defer func ( ) {
_ = mc . stop ( )
} ( )
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint )
_ = exp . Shutdown ( ctx )
}
func TestNewExporter_withHeaders ( t * testing . T ) {
mc := runMockCollector ( t )
defer func ( ) {
_ = mc . stop ( )
} ( )
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint ,
otlpmetricgrpc . WithHeaders ( map [ string ] string { "header1" : "value1" } ) )
2021-08-13 00:44:58 +02:00
require . NoError ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
defer func ( ) {
_ = exp . Shutdown ( ctx )
} ( )
headers := mc . getHeaders ( )
require . Len ( t , headers . Get ( "header1" ) , 1 )
assert . Equal ( t , "value1" , headers . Get ( "header1" ) [ 0 ] )
}
func TestNewExporter_WithTimeout ( t * testing . T ) {
tts := [ ] struct {
name string
fn func ( exp * otlpmetric . Exporter ) error
timeout time . Duration
metrics int
spans int
code codes . Code
delay bool
} {
{
name : "Timeout Metrics" ,
fn : func ( exp * otlpmetric . Exporter ) error {
2021-08-13 00:44:58 +02:00
return exp . Export ( context . Background ( ) , testResource , oneRecord )
2021-06-11 22:25:56 +02:00
} ,
timeout : time . Millisecond * 100 ,
code : codes . DeadlineExceeded ,
delay : true ,
} ,
{
name : "No Timeout Metrics" ,
fn : func ( exp * otlpmetric . Exporter ) error {
2021-08-13 00:44:58 +02:00
return exp . Export ( context . Background ( ) , testResource , oneRecord )
2021-06-11 22:25:56 +02:00
} ,
timeout : time . Minute ,
metrics : 1 ,
code : codes . OK ,
} ,
}
for _ , tt := range tts {
t . Run ( tt . name , func ( t * testing . T ) {
mc := runMockCollector ( t )
if tt . delay {
mc . metricSvc . delay = time . Second * 10
}
defer func ( ) {
_ = mc . stop ( )
} ( )
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint , otlpmetricgrpc . WithTimeout ( tt . timeout ) , otlpmetricgrpc . WithRetry ( otlpmetricgrpc . RetrySettings { Enabled : false } ) )
defer func ( ) {
_ = exp . Shutdown ( ctx )
} ( )
err := tt . fn ( exp )
if tt . code == codes . OK {
require . NoError ( t , err )
} else {
require . Error ( t , err )
}
s := status . Convert ( err )
require . Equal ( t , tt . code , s . Code ( ) )
require . Len ( t , mc . getMetrics ( ) , tt . metrics )
} )
}
}
func TestNewExporter_withInvalidSecurityConfiguration ( t * testing . T ) {
mc := runMockCollector ( t )
defer func ( ) {
_ = mc . stop ( )
} ( )
ctx := context . Background ( )
client := otlpmetricgrpc . NewClient ( otlpmetricgrpc . WithEndpoint ( mc . endpoint ) )
exp , err := otlpmetric . New ( ctx , client )
if err != nil {
t . Fatalf ( "failed to create a new collector exporter: %v" , err )
}
2021-08-13 00:44:58 +02:00
err = exp . Export ( ctx , testResource , oneRecord )
2021-06-11 22:25:56 +02:00
expectedErr := fmt . Sprintf ( "metrics exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)" , mc . endpoint )
require . Error ( t , err )
require . Equal ( t , expectedErr , err . Error ( ) )
defer func ( ) {
_ = exp . Shutdown ( ctx )
} ( )
}
func TestDisconnected ( t * testing . T ) {
ctx := context . Background ( )
// The endpoint is whatever, we want to be disconnected. But we
// setting a blocking connection, so dialing to the invalid
// endpoint actually fails.
exp := newGRPCExporter ( t , ctx , "invalid" ,
otlpmetricgrpc . WithReconnectionPeriod ( time . Hour ) ,
otlpmetricgrpc . WithDialOption (
grpc . WithBlock ( ) ,
grpc . FailOnNonTempDialError ( true ) ,
) ,
)
defer func ( ) {
assert . NoError ( t , exp . Shutdown ( ctx ) )
} ( )
2021-08-13 00:44:58 +02:00
assert . Error ( t , exp . Export ( ctx , testResource , oneRecord ) )
2021-06-11 22:25:56 +02:00
}
func TestEmptyData ( t * testing . T ) {
mc := runMockCollectorAtEndpoint ( t , "localhost:56561" )
defer func ( ) {
_ = mc . stop ( )
} ( )
<- time . After ( 5 * time . Millisecond )
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint )
defer func ( ) {
assert . NoError ( t , exp . Shutdown ( ctx ) )
} ( )
2021-09-27 17:51:47 +02:00
assert . NoError ( t , exp . Export ( ctx , testResource , otlpmetrictest . EmptyReader ( ) ) )
2021-06-11 22:25:56 +02:00
}
func TestFailedMetricTransform ( t * testing . T ) {
mc := runMockCollectorAtEndpoint ( t , "localhost:56561" )
defer func ( ) {
_ = mc . stop ( )
} ( )
<- time . After ( 5 * time . Millisecond )
ctx := context . Background ( )
exp := newGRPCExporter ( t , ctx , mc . endpoint )
defer func ( ) {
assert . NoError ( t , exp . Shutdown ( ctx ) )
} ( )
2021-09-27 17:51:47 +02:00
assert . Error ( t , exp . Export ( ctx , testResource , otlpmetrictest . FailReader { } ) )
2021-06-11 22:25:56 +02:00
}