1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-03 22:52:30 +02:00

Add metrics support to the OTLP exporter (#544)

* Initial metrics addition to the OTLP exporter

* Fixes

Update to incorporate merged changes.
Fix lint issues.

* Add sum float64 transform unit test

* Fix static check

* Update comments

Fix malformed License header.
Add documentation for new transform functions.
Remove errant TODO.

* Fix test failures and handle ErrEmptyDataSet

Use `assert.NoError` instead of `assert.Nil` to correctly display
checked errors.

Use the result of `assert.NoError` to guard against `nil` pointer
dereferences.

Add check to skip `Record`s that return an `ErrEmptyDataSet` error and
include test to check this error is correctly returned from the
transform package.

Co-authored-by: Rahul Patel <rahulpa@google.com>
This commit is contained in:
Tyler Yahn 2020-03-13 11:42:20 -07:00 committed by GitHub
parent fcc4aca8c7
commit cba1664b46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 784 additions and 33 deletions

View File

@ -0,0 +1,143 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package transform provides translations for opentelemetry-go concepts and
// structures to otlp structures.
package transform
import (
"errors"
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
"go.opentelemetry.io/otel/api/core"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
// ErrUnimplementedAgg is returned when a transformation of an unimplemented
// aggregator is attempted.
var ErrUnimplementedAgg = errors.New("unimplemented aggregator")
// Record transforms a Record into an OTLP Metric. An ErrUnimplementedAgg
// error is returned if the Record Aggregator is not supported.
func Record(r metricsdk.Record) (*metricpb.Metric, error) {
d := r.Descriptor()
l := r.Labels()
switch a := r.Aggregator().(type) {
case aggregator.MinMaxSumCount:
return minMaxSumCount(d, l, a)
case aggregator.Sum:
return sum(d, l, a)
}
return nil, ErrUnimplementedAgg
}
// sum transforms a Sum Aggregator into an OTLP Metric.
func sum(desc *metricsdk.Descriptor, labels metricsdk.Labels, a aggregator.Sum) (*metricpb.Metric, error) {
sum, err := a.Sum()
if err != nil {
return nil, err
}
m := &metricpb.Metric{
MetricDescriptor: &metricpb.MetricDescriptor{
Name: desc.Name(),
Description: desc.Description(),
Unit: string(desc.Unit()),
Labels: stringKeyValues(labels.Ordered()),
},
}
switch n := desc.NumberKind(); n {
case core.Int64NumberKind, core.Uint64NumberKind:
m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_INT64
m.Int64Datapoints = []*metricpb.Int64DataPoint{
{Value: sum.CoerceToInt64(n)},
}
case core.Float64NumberKind:
m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_DOUBLE
m.DoubleDatapoints = []*metricpb.DoubleDataPoint{
{Value: sum.CoerceToFloat64(n)},
}
}
return m, nil
}
// minMaxSumCountValue returns the values of the MinMaxSumCount Aggregator
// as discret values.
func minMaxSumCountValues(a aggregator.MinMaxSumCount) (min, max, sum core.Number, count int64, err error) {
if min, err = a.Min(); err != nil {
return
}
if max, err = a.Max(); err != nil {
return
}
if sum, err = a.Sum(); err != nil {
return
}
if count, err = a.Count(); err != nil {
return
}
return
}
// minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric.
func minMaxSumCount(desc *metricsdk.Descriptor, labels metricsdk.Labels, a aggregator.MinMaxSumCount) (*metricpb.Metric, error) {
min, max, sum, count, err := minMaxSumCountValues(a)
if err != nil {
return nil, err
}
numKind := desc.NumberKind()
return &metricpb.Metric{
MetricDescriptor: &metricpb.MetricDescriptor{
Name: desc.Name(),
Description: desc.Description(),
Unit: string(desc.Unit()),
Type: metricpb.MetricDescriptor_SUMMARY,
Labels: stringKeyValues(labels.Ordered()),
},
SummaryDatapoints: []*metricpb.SummaryDataPoint{
{
Count: uint64(count),
Sum: sum.CoerceToFloat64(numKind),
PercentileValues: []*metricpb.SummaryDataPoint_ValueAtPercentile{
{
Percentile: 0.0,
Value: min.CoerceToFloat64(numKind),
},
{
Percentile: 100.0,
Value: max.CoerceToFloat64(numKind),
},
},
},
},
}, nil
}
// stringKeyValues transforms a KeyValues into an OTLP StringKeyValues.
func stringKeyValues(kvs []core.KeyValue) []*commonpb.StringKeyValue {
result := make([]*commonpb.StringKeyValue, 0, len(kvs))
for _, kv := range kvs {
result = append(result, &commonpb.StringKeyValue{
Key: string(kv.Key),
Value: kv.Value.Emit(),
})
}
return result
}

View File

@ -0,0 +1,279 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transform
import (
"context"
"testing"
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/unit"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)
func TestStringKeyValues(t *testing.T) {
tests := []struct {
kvs []core.KeyValue
expected []*commonpb.StringKeyValue
}{
{
[]core.KeyValue{},
[]*commonpb.StringKeyValue{},
},
{
[]core.KeyValue{
core.Key("true").Bool(true),
core.Key("one").Int64(1),
core.Key("two").Uint64(2),
core.Key("three").Float64(3),
core.Key("four").Int32(4),
core.Key("five").Uint32(5),
core.Key("six").Float32(6),
core.Key("seven").Int(7),
core.Key("eight").Uint(8),
core.Key("the").String("final word"),
},
[]*commonpb.StringKeyValue{
{Key: "true", Value: "true"},
{Key: "one", Value: "1"},
{Key: "two", Value: "2"},
{Key: "three", Value: "3"},
{Key: "four", Value: "4"},
{Key: "five", Value: "5"},
{Key: "six", Value: "6"},
{Key: "seven", Value: "7"},
{Key: "eight", Value: "8"},
{Key: "the", Value: "final word"},
},
},
}
for _, test := range tests {
assert.Equal(t, test.expected, stringKeyValues(test.kvs))
}
}
func TestMinMaxSumCountValue(t *testing.T) {
mmsc := minmaxsumcount.New(&metricsdk.Descriptor{})
assert.NoError(t, mmsc.Update(context.Background(), 1, &metricsdk.Descriptor{}))
assert.NoError(t, mmsc.Update(context.Background(), 10, &metricsdk.Descriptor{}))
// Prior to checkpointing ErrEmptyDataSet should be returned.
_, _, _, _, err := minMaxSumCountValues(mmsc)
assert.Error(t, err, aggregator.ErrEmptyDataSet)
// Checkpoint to set non-zero values
mmsc.Checkpoint(context.Background(), &metricsdk.Descriptor{})
min, max, sum, count, err := minMaxSumCountValues(mmsc)
if assert.NoError(t, err) {
assert.Equal(t, min, core.NewInt64Number(1))
assert.Equal(t, max, core.NewInt64Number(10))
assert.Equal(t, sum, core.NewInt64Number(11))
assert.Equal(t, count, int64(2))
}
}
func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
tests := []struct {
name string
metricKind metricsdk.Kind
keys []core.Key
description string
unit unit.Unit
numberKind core.NumberKind
labels []core.KeyValue
expected *metricpb.MetricDescriptor
}{
{
"mmsc-test-a",
metricsdk.MeasureKind,
[]core.Key{},
"test-a-description",
unit.Dimensionless,
core.Int64NumberKind,
[]core.KeyValue{},
&metricpb.MetricDescriptor{
Name: "mmsc-test-a",
Description: "test-a-description",
Unit: "1",
Type: metricpb.MetricDescriptor_SUMMARY,
Labels: []*commonpb.StringKeyValue{},
},
},
{
"mmsc-test-b",
metricsdk.CounterKind, // This shouldn't change anything.
[]core.Key{"test"}, // This shouldn't change anything.
"test-b-description",
unit.Bytes,
core.Float64NumberKind, // This shouldn't change anything.
[]core.KeyValue{core.Key("A").String("1")},
&metricpb.MetricDescriptor{
Name: "mmsc-test-b",
Description: "test-b-description",
Unit: "By",
Type: metricpb.MetricDescriptor_SUMMARY,
Labels: []*commonpb.StringKeyValue{{Key: "A", Value: "1"}},
},
},
}
ctx := context.Background()
mmsc := minmaxsumcount.New(&metricsdk.Descriptor{})
if !assert.NoError(t, mmsc.Update(ctx, 1, &metricsdk.Descriptor{})) {
return
}
mmsc.Checkpoint(ctx, &metricsdk.Descriptor{})
for _, test := range tests {
desc := metricsdk.NewDescriptor(test.name, test.metricKind, test.keys, test.description, test.unit, test.numberKind)
labels := metricsdk.NewLabels(test.labels, "", nil)
got, err := minMaxSumCount(desc, labels, mmsc)
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
}
}
func TestMinMaxSumCountDatapoints(t *testing.T) {
desc := metricsdk.NewDescriptor("", metricsdk.MeasureKind, []core.Key{}, "", unit.Dimensionless, core.Int64NumberKind)
labels := metricsdk.NewLabels([]core.KeyValue{}, "", nil)
mmsc := minmaxsumcount.New(desc)
assert.NoError(t, mmsc.Update(context.Background(), 1, desc))
assert.NoError(t, mmsc.Update(context.Background(), 10, desc))
mmsc.Checkpoint(context.Background(), desc)
expected := []*metricpb.SummaryDataPoint{
{
Count: 2,
Sum: 11,
PercentileValues: []*metricpb.SummaryDataPoint_ValueAtPercentile{
{
Percentile: 0.0,
Value: 1,
},
{
Percentile: 100.0,
Value: 10,
},
},
},
}
m, err := minMaxSumCount(desc, labels, mmsc)
if assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64Datapoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDatapoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDatapoints)
assert.Equal(t, expected, m.SummaryDatapoints)
}
}
func TestMinMaxSumCountPropagatesErrors(t *testing.T) {
// ErrEmptyDataSet should be returned by both the Min and Max values of
// a MinMaxSumCount Aggregator. Use this fact to check the error is
// correctly returned.
mmsc := minmaxsumcount.New(&metricsdk.Descriptor{})
_, _, _, _, err := minMaxSumCountValues(mmsc)
assert.Error(t, err)
assert.Equal(t, aggregator.ErrEmptyDataSet, err)
}
func TestSumMetricDescriptor(t *testing.T) {
tests := []struct {
name string
metricKind metricsdk.Kind
keys []core.Key
description string
unit unit.Unit
numberKind core.NumberKind
labels []core.KeyValue
expected *metricpb.MetricDescriptor
}{
{
"sum-test-a",
metricsdk.CounterKind,
[]core.Key{},
"test-a-description",
unit.Dimensionless,
core.Int64NumberKind,
[]core.KeyValue{},
&metricpb.MetricDescriptor{
Name: "sum-test-a",
Description: "test-a-description",
Unit: "1",
Type: metricpb.MetricDescriptor_COUNTER_INT64,
Labels: []*commonpb.StringKeyValue{},
},
},
{
"sum-test-b",
metricsdk.MeasureKind, // This shouldn't change anything.
[]core.Key{"test"}, // This shouldn't change anything.
"test-b-description",
unit.Milliseconds,
core.Float64NumberKind,
[]core.KeyValue{core.Key("A").String("1")},
&metricpb.MetricDescriptor{
Name: "sum-test-b",
Description: "test-b-description",
Unit: "ms",
Type: metricpb.MetricDescriptor_COUNTER_DOUBLE,
Labels: []*commonpb.StringKeyValue{{Key: "A", Value: "1"}},
},
},
}
for _, test := range tests {
desc := metricsdk.NewDescriptor(test.name, test.metricKind, test.keys, test.description, test.unit, test.numberKind)
labels := metricsdk.NewLabels(test.labels, "", nil)
got, err := sum(desc, labels, sumAgg.New())
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
}
}
func TestSumInt64Datapoints(t *testing.T) {
desc := metricsdk.NewDescriptor("", metricsdk.MeasureKind, []core.Key{}, "", unit.Dimensionless, core.Int64NumberKind)
labels := metricsdk.NewLabels([]core.KeyValue{}, "", nil)
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), core.Number(1), desc))
s.Checkpoint(context.Background(), desc)
if m, err := sum(desc, labels, s); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64Datapoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDatapoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDatapoints)
assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDatapoints)
}
}
func TestSumFloat64Datapoints(t *testing.T) {
desc := metricsdk.NewDescriptor("", metricsdk.MeasureKind, []core.Key{}, "", unit.Dimensionless, core.Float64NumberKind)
labels := metricsdk.NewLabels([]core.KeyValue{}, "", nil)
s := sumAgg.New()
assert.NoError(t, s.Update(context.Background(), core.NewFloat64Number(1), desc))
s.Checkpoint(context.Background(), desc)
if m, err := sum(desc, labels, s); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64Datapoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDatapoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDatapoints)
assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDatapoints)
}
}

View File

@ -24,36 +24,79 @@ import (
"google.golang.org/grpc"
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
)
func makeMockCollector(t *testing.T) *mockCol {
return &mockCol{t: t, wg: new(sync.WaitGroup)}
return &mockCol{
t: t,
traceSvc: &mockTraceService{},
metricSvc: &mockMetricService{},
}
}
type mockTraceService struct {
mu sync.RWMutex
spans []*tracepb.Span
}
func (mts *mockTraceService) getSpans() []*tracepb.Span {
mts.mu.RLock()
spans := append([]*tracepb.Span{}, mts.spans...)
mts.mu.RUnlock()
return spans
}
func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
resourceSpans := exp.GetResourceSpans()
// TODO (rghetia): handle Resources
mts.mu.Lock()
for _, rs := range resourceSpans {
mts.spans = append(mts.spans, rs.Spans...)
}
mts.mu.Unlock()
return &coltracepb.ExportTraceServiceResponse{}, nil
}
type mockMetricService struct {
mu sync.RWMutex
metrics []*metricpb.Metric
}
func (mms *mockMetricService) getMetrics() []*metricpb.Metric {
// copy in order to not change.
m := make([]*metricpb.Metric, 0, len(mms.metrics))
mms.mu.RLock()
defer mms.mu.RUnlock()
return append(m, mms.metrics...)
}
func (mms *mockMetricService) Export(ctx context.Context, exp *colmetricpb.ExportMetricsServiceRequest) (*colmetricpb.ExportMetricsServiceResponse, error) {
mms.mu.Lock()
for _, rm := range exp.GetResourceMetrics() {
mms.metrics = append(mms.metrics, rm.Metrics...)
}
mms.mu.Unlock()
return &colmetricpb.ExportMetricsServiceResponse{}, nil
}
type mockCol struct {
t *testing.T
spans []*tracepb.Span
mu sync.Mutex
wg *sync.WaitGroup
traceSvc *mockTraceService
metricSvc *mockMetricService
address string
stopFunc func() error
stopOnce sync.Once
}
var _ coltracepb.TraceServiceServer = (*mockCol)(nil)
func (mc *mockCol) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
resourceSpans := exp.GetResourceSpans()
// TODO (rghetia): handle Resources
for _, rs := range resourceSpans {
mc.spans = append(mc.spans, rs.Spans...)
}
return &coltracepb.ExportTraceServiceResponse{}, nil
}
var _ coltracepb.TraceServiceServer = (*mockTraceService)(nil)
var _ colmetricpb.MetricsServiceServer = (*mockMetricService)(nil)
var errAlreadyStopped = fmt.Errorf("already stopped")
@ -66,12 +109,35 @@ func (mc *mockCol) stop() error {
})
// Give it sometime to shutdown.
<-time.After(160 * time.Millisecond)
mc.mu.Lock()
mc.wg.Wait()
mc.mu.Unlock()
// Wait for services to finish reading/writing.
var wg sync.WaitGroup
wg.Add(1)
go func() {
// Getting the lock ensures the traceSvc is done flushing.
mc.traceSvc.mu.Lock()
defer mc.traceSvc.mu.Unlock()
wg.Done()
}()
wg.Add(1)
go func() {
// Getting the lock ensures the metricSvc is done flushing.
mc.metricSvc.mu.Lock()
defer mc.metricSvc.mu.Unlock()
wg.Done()
}()
wg.Wait()
return err
}
func (mc *mockCol) getSpans() []*tracepb.Span {
return mc.traceSvc.getSpans()
}
func (mc *mockCol) getMetrics() []*metricpb.Metric {
return mc.metricSvc.getMetrics()
}
// runMockCol is a helper function to create a mockCol
func runMockCol(t *testing.T) *mockCol {
return runMockColAtAddr(t, "localhost:0")
@ -85,7 +151,8 @@ func runMockColAtAddr(t *testing.T, addr string) *mockCol {
srv := grpc.NewServer()
mc := makeMockCollector(t)
coltracepb.RegisterTraceServiceServer(srv, mc)
coltracepb.RegisterTraceServiceServer(srv, mc.traceSvc)
colmetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc)
go func() {
_ = srv.Serve(ln)
}()
@ -102,11 +169,3 @@ func runMockColAtAddr(t *testing.T, addr string) *mockCol {
return mc
}
func (mc *mockCol) getSpans() []*tracepb.Span {
mc.mu.Lock()
spans := append([]*tracepb.Span{}, mc.spans...)
mc.mu.Unlock()
return spans
}

View File

@ -24,6 +24,7 @@ import (
const (
DefaultCollectorPort uint16 = 55678
DefaultCollectorHost string = "localhost"
DefaultNumWorkers uint = 1
)
type ExporterOption func(*Config)
@ -36,6 +37,17 @@ type Config struct {
grpcDialOptions []grpc.DialOption
headers map[string]string
clientCredentials credentials.TransportCredentials
numWorkers uint
}
// WorkerCount sets the number of Goroutines to use when processing telemetry.
func WorkerCount(n uint) ExporterOption {
if n == 0 {
n = DefaultNumWorkers
}
return func(cfg *Config) {
cfg.numWorkers = n
}
}
// WithInsecure disables client transport security for the exporter's gRPC connection

View File

@ -19,25 +19,32 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"unsafe"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
export "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)
type Exporter struct {
// mu protects the non-atomic and non-channel variables
mu sync.RWMutex
// senderMu protects the concurrent unsafe send on traceExporter client
// senderMu protects the concurrent unsafe sends on the shared gRPC client connection.
senderMu sync.Mutex
started bool
traceExporter coltracepb.TraceServiceClient
metricExporter colmetricpb.MetricsServiceClient
grpcClientConn *grpc.ClientConn
lastConnectErrPtr unsafe.Pointer
@ -50,7 +57,8 @@ type Exporter struct {
c Config
}
var _ export.SpanBatcher = (*Exporter)(nil)
var _ tracesdk.SpanBatcher = (*Exporter)(nil)
var _ metricsdk.Exporter = (*Exporter)(nil)
func configureOptions(cfg *Config, opts ...ExporterOption) {
for _, opt := range opts {
@ -68,7 +76,7 @@ func NewExporter(opts ...ExporterOption) (*Exporter, error) {
func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
e := new(Exporter)
e.c = Config{}
e.c = Config{numWorkers: DefaultNumWorkers}
configureOptions(&e.c, opts...)
// TODO (rghetia): add resources
@ -142,6 +150,7 @@ func (e *Exporter) enableConnections(cc *grpc.ClientConn) error {
}
e.grpcClientConn = cc
e.traceExporter = coltracepb.NewTraceServiceClient(cc)
e.metricExporter = colmetricpb.NewMetricsServiceClient(cc)
e.mu.Unlock()
return nil
@ -200,11 +209,140 @@ func (e *Exporter) Stop() error {
return err
}
func (e *Exporter) ExportSpans(ctx context.Context, sds []*export.SpanData) {
// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
// interface. It transforms metric Records into OTLP Metrics and transmits them.
func (e *Exporter) Export(ctx context.Context, cps metricsdk.CheckpointSet) error {
// Seed records into the work processing pool.
records := make(chan metricsdk.Record)
go func() {
cps.ForEach(func(record metricsdk.Record) {
select {
case <-e.stopCh:
case <-ctx.Done():
case records <- record:
}
})
close(records)
}()
// Allow all errors to be collected and returned singularly.
errCh := make(chan error)
var errStrings []string
go func() {
for err := range errCh {
if err != nil {
errStrings = append(errStrings, err.Error())
}
}
}()
// Start the work processing pool.
processed := make(chan *metricpb.Metric)
var wg sync.WaitGroup
for i := uint(0); i < e.c.numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
e.processMetrics(ctx, processed, errCh, records)
}()
}
go func() {
wg.Wait()
close(processed)
}()
// Synchronosly collected the processed records and transmit.
e.uploadMetrics(ctx, processed, errCh)
// Now that all processing is done, handle any errors seen.
close(errCh)
if len(errStrings) > 0 {
return fmt.Errorf("errors exporting:\n -%s", strings.Join(errStrings, "\n -"))
}
return nil
}
func (e *Exporter) processMetrics(ctx context.Context, out chan<- *metricpb.Metric, errCh chan<- error, in <-chan metricsdk.Record) {
for r := range in {
m, err := transform.Record(r)
if err != nil {
if err == aggregator.ErrEmptyDataSet {
// The Aggregator was checkpointed before the first value
// was set, skipping.
continue
}
select {
case <-e.stopCh:
return
case <-ctx.Done():
return
case errCh <- err:
continue
}
}
select {
case <-e.stopCh:
return
case <-ctx.Done():
return
case out <- m:
}
}
}
func (e *Exporter) uploadMetrics(ctx context.Context, in <-chan *metricpb.Metric, errCh chan<- error) {
var protoMetrics []*metricpb.Metric
for m := range in {
protoMetrics = append(protoMetrics, m)
}
if len(protoMetrics) == 0 {
return
}
if !e.connected() {
return
}
rm := []*metricpb.ResourceMetrics{
{
Resource: nil,
Metrics: protoMetrics,
},
}
select {
case <-e.stopCh:
return
case <-ctx.Done():
return
default:
e.senderMu.Lock()
_, err := e.metricExporter.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: rm,
})
e.senderMu.Unlock()
if err != nil {
select {
case <-e.stopCh:
return
case <-ctx.Done():
return
case errCh <- err:
}
}
}
}
func (e *Exporter) ExportSpan(ctx context.Context, sd *tracesdk.SpanData) {
e.uploadTraces(ctx, []*tracesdk.SpanData{sd})
}
func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) {
e.uploadTraces(ctx, sds)
}
func otSpanDataToPbSpans(sdl []*export.SpanData) []*tracepb.ResourceSpans {
func otSpanDataToPbSpans(sdl []*tracesdk.SpanData) []*tracepb.ResourceSpans {
if len(sdl) == 0 {
return nil
}
@ -222,7 +360,7 @@ func otSpanDataToPbSpans(sdl []*export.SpanData) []*tracepb.ResourceSpans {
}
}
func (e *Exporter) uploadTraces(ctx context.Context, sdl []*export.SpanData) {
func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) {
select {
case <-e.stopCh:
return

View File

@ -24,9 +24,16 @@ import (
"github.com/stretchr/testify/assert"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
"go.opentelemetry.io/otel/api/core"
metricapi "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/otlp"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
export "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
@ -90,6 +97,72 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
span.End()
}
selector := simple.NewWithExactMeasure()
batcher := ungrouped.New(selector, true)
pusher := push.New(batcher, exp, 60*time.Second)
pusher.Start()
ctx := context.Background()
meter := pusher.Meter("test-meter")
labels := meter.Labels(core.Key("test").Bool(true))
type data struct {
iKind metricsdk.Kind
nKind core.NumberKind
val int64
}
instruments := map[string]data{
"test-int64-counter": {metricsdk.CounterKind, core.Int64NumberKind, 1},
"test-float64-counter": {metricsdk.CounterKind, core.Float64NumberKind, 1},
"test-int64-measure": {metricsdk.MeasureKind, core.Int64NumberKind, 2},
"test-float64-measure": {metricsdk.MeasureKind, core.Float64NumberKind, 2},
"test-int64-observer": {metricsdk.ObserverKind, core.Int64NumberKind, 3},
"test-float64-observer": {metricsdk.ObserverKind, core.Float64NumberKind, 3},
}
for name, data := range instruments {
switch data.iKind {
case metricsdk.CounterKind:
switch data.nKind {
case core.Int64NumberKind:
metricapi.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels)
case core.Float64NumberKind:
metricapi.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
case metricsdk.MeasureKind:
switch data.nKind {
case core.Int64NumberKind:
metricapi.Must(meter).NewInt64Measure(name).Record(ctx, data.val, labels)
case core.Float64NumberKind:
metricapi.Must(meter).NewFloat64Measure(name).Record(ctx, float64(data.val), labels)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
case metricsdk.ObserverKind:
switch data.nKind {
case core.Int64NumberKind:
callback := func(v int64) metricapi.Int64ObserverCallback {
return metricapi.Int64ObserverCallback(func(result metricapi.Int64ObserverResult) { result.Observe(v, labels) })
}(data.val)
metricapi.Must(meter).RegisterInt64Observer(name, callback)
case core.Float64NumberKind:
callback := func(v float64) metricapi.Float64ObserverCallback {
return metricapi.Float64ObserverCallback(func(result metricapi.Float64ObserverResult) { result.Observe(v, labels) })
}(float64(data.val))
metricapi.Must(meter).RegisterFloat64Observer(name, callback)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
default:
assert.Failf(t, "unsupported metrics testing kind", data.iKind.String())
}
}
// Flush and close.
pusher.Stop()
// Wait >2 cycles.
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
@ -115,6 +188,53 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
t.Fatalf("span attribute value: got %d, want %d", got, want)
}
}
metrics := mc.getMetrics()
assert.Len(t, metrics, len(instruments), "not enough metrics exported")
seen := make(map[string]struct{}, len(instruments))
for _, m := range metrics {
desc := m.GetMetricDescriptor()
data, ok := instruments[desc.Name]
if !ok {
assert.Failf(t, "unknown metrics", desc.Name)
continue
}
seen[desc.Name] = struct{}{}
switch data.iKind {
case metricsdk.CounterKind:
switch data.nKind {
case core.Int64NumberKind:
assert.Equal(t, metricpb.MetricDescriptor_COUNTER_INT64.String(), desc.GetType().String())
if dp := m.GetInt64Datapoints(); assert.Len(t, dp, 1) {
assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", desc.Name)
}
case core.Float64NumberKind:
assert.Equal(t, metricpb.MetricDescriptor_COUNTER_DOUBLE.String(), desc.GetType().String())
if dp := m.GetDoubleDatapoints(); assert.Len(t, dp, 1) {
assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", desc.Name)
}
default:
assert.Failf(t, "invalid number kind", data.nKind.String())
}
case metricsdk.MeasureKind, metricsdk.ObserverKind:
assert.Equal(t, metricpb.MetricDescriptor_SUMMARY.String(), desc.GetType().String())
m.GetSummaryDatapoints()
if dp := m.GetSummaryDatapoints(); assert.Len(t, dp, 1) {
count := dp[0].Count
assert.Equal(t, uint64(1), count, "invalid count for %q", desc.Name)
assert.Equal(t, float64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", desc.Name, data.val)
}
default:
assert.Failf(t, "invalid metrics kind", data.iKind.String())
}
}
for i := range instruments {
if _, ok := seen[i]; !ok {
assert.Fail(t, fmt.Sprintf("no metric(s) exported for %q", i))
}
}
}
func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {