mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-14 10:13:10 +02:00
ebc245b388
* Add check to sum transform for unknown NumberKind * Initial batching * Move CheckpointSet transform to internal package * Add tests for the Exporter Export method Check batching and general output exporter ResourceMetrics are correct. * Check errors in tests * Apply suggestions from code review Co-Authored-By: Krzesimir Nowak <qdlacz@gmail.com> * Use var instead of multiple calls for group IDs * Fix otlp metric test reporting Co-authored-by: Krzesimir Nowak <qdlacz@gmail.com>
345 lines
9.8 KiB
Go
345 lines
9.8 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package transform provides translations for opentelemetry-go concepts and
|
|
// structures to otlp structures.
|
|
package transform
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
|
|
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
|
|
resourcepb "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
|
|
|
|
"go.opentelemetry.io/otel/api/core"
|
|
"go.opentelemetry.io/otel/api/metric"
|
|
export "go.opentelemetry.io/otel/sdk/export/metric"
|
|
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
)
|
|
|
|
var (
|
|
// ErrUnimplementedAgg is returned when a transformation of an unimplemented
|
|
// aggregator is attempted.
|
|
ErrUnimplementedAgg = errors.New("unimplemented aggregator")
|
|
|
|
// ErrUnknownValueType is returned when a transformation of an unknown value
|
|
// is attempted.
|
|
ErrUnknownValueType = errors.New("invalid value type")
|
|
|
|
// ErrContextCanceled is returned when a context cancellation halts a
|
|
// transformation.
|
|
ErrContextCanceled = errors.New("context canceled")
|
|
|
|
// ErrTransforming is returned when an unexected error is encoutered transforming.
|
|
ErrTransforming = errors.New("transforming failed")
|
|
)
|
|
|
|
// result is the product of transforming Records into OTLP Metrics.
|
|
type result struct {
|
|
Resource resource.Resource
|
|
Library string
|
|
Metric *metricpb.Metric
|
|
Err error
|
|
}
|
|
|
|
// CheckpointSet transforms all records contained in a checkpoint into
|
|
// batched OTLP ResourceMetrics.
|
|
func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
|
|
records, errc := source(ctx, cps)
|
|
|
|
// Start a fixed number of goroutines to transform records.
|
|
transformed := make(chan result)
|
|
var wg sync.WaitGroup
|
|
wg.Add(int(numWorkers))
|
|
for i := uint(0); i < numWorkers; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
transformer(ctx, records, transformed)
|
|
}()
|
|
}
|
|
go func() {
|
|
wg.Wait()
|
|
close(transformed)
|
|
}()
|
|
|
|
// Synchronously collect the transformed records and transmit.
|
|
rms, err := sink(ctx, transformed)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// source is complete, check for any errors.
|
|
if err := <-errc; err != nil {
|
|
return nil, err
|
|
}
|
|
return rms, nil
|
|
}
|
|
|
|
// source starts a goroutine that sends each one of the Records yielded by
|
|
// the CheckpointSet on the returned chan. Any error encoutered will be sent
|
|
// on the returned error chan after seeding is complete.
|
|
func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record, <-chan error) {
|
|
errc := make(chan error, 1)
|
|
out := make(chan export.Record)
|
|
// Seed records into process.
|
|
go func() {
|
|
defer close(out)
|
|
// No select is needed since errc is buffered.
|
|
errc <- cps.ForEach(func(r export.Record) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ErrContextCanceled
|
|
case out <- r:
|
|
}
|
|
return nil
|
|
})
|
|
}()
|
|
return out, errc
|
|
}
|
|
|
|
// transformer transforms records read from the passed in chan into
|
|
// OTLP Metrics which are sent on the out chan.
|
|
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
|
|
for r := range in {
|
|
m, err := Record(r)
|
|
// Propagate errors, but do not send empty results.
|
|
if err == nil && m == nil {
|
|
continue
|
|
}
|
|
res := result{
|
|
Resource: r.Descriptor().Resource(),
|
|
Library: r.Descriptor().LibraryName(),
|
|
Metric: m,
|
|
Err: err,
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case out <- res:
|
|
}
|
|
}
|
|
}
|
|
|
|
// sink collects transformed Records and batches them.
|
|
//
|
|
// Any errors encoutered transforming input will be reported with an
|
|
// ErrTransforming as well as the completed ResourceMetrics. It is up to the
|
|
// caller to handle any incorrect data in these ResourceMetrics.
|
|
func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, error) {
|
|
var errStrings []string
|
|
|
|
type resourceBatch struct {
|
|
Resource *resourcepb.Resource
|
|
// Group by instrumentation library name and then the MetricDescriptor.
|
|
InstrumentationLibraryBatches map[string]map[string]*metricpb.Metric
|
|
}
|
|
|
|
// group by unique Resource string.
|
|
grouped := make(map[string]resourceBatch)
|
|
for res := range in {
|
|
if res.Err != nil {
|
|
errStrings = append(errStrings, res.Err.Error())
|
|
continue
|
|
}
|
|
|
|
rID := res.Resource.String()
|
|
rb, ok := grouped[rID]
|
|
if !ok {
|
|
rb = resourceBatch{
|
|
Resource: Resource(&res.Resource),
|
|
InstrumentationLibraryBatches: make(map[string]map[string]*metricpb.Metric),
|
|
}
|
|
grouped[rID] = rb
|
|
}
|
|
|
|
mb, ok := rb.InstrumentationLibraryBatches[res.Library]
|
|
if !ok {
|
|
mb = make(map[string]*metricpb.Metric)
|
|
rb.InstrumentationLibraryBatches[res.Library] = mb
|
|
}
|
|
|
|
mID := res.Metric.GetMetricDescriptor().String()
|
|
m, ok := mb[mID]
|
|
if !ok {
|
|
mb[mID] = res.Metric
|
|
continue
|
|
}
|
|
if len(res.Metric.Int64DataPoints) > 0 {
|
|
m.Int64DataPoints = append(m.Int64DataPoints, res.Metric.Int64DataPoints...)
|
|
}
|
|
if len(res.Metric.DoubleDataPoints) > 0 {
|
|
m.DoubleDataPoints = append(m.DoubleDataPoints, res.Metric.DoubleDataPoints...)
|
|
}
|
|
if len(res.Metric.HistogramDataPoints) > 0 {
|
|
m.HistogramDataPoints = append(m.HistogramDataPoints, res.Metric.HistogramDataPoints...)
|
|
}
|
|
if len(res.Metric.SummaryDataPoints) > 0 {
|
|
m.SummaryDataPoints = append(m.SummaryDataPoints, res.Metric.SummaryDataPoints...)
|
|
}
|
|
}
|
|
|
|
if len(grouped) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var rms []*metricpb.ResourceMetrics
|
|
for _, rb := range grouped {
|
|
rm := &metricpb.ResourceMetrics{Resource: rb.Resource}
|
|
for ilName, mb := range rb.InstrumentationLibraryBatches {
|
|
ilm := &metricpb.InstrumentationLibraryMetrics{
|
|
Metrics: make([]*metricpb.Metric, 0, len(mb)),
|
|
}
|
|
if ilName != "" {
|
|
ilm.InstrumentationLibrary = &commonpb.InstrumentationLibrary{Name: ilName}
|
|
}
|
|
for _, m := range mb {
|
|
ilm.Metrics = append(ilm.Metrics, m)
|
|
}
|
|
rm.InstrumentationLibraryMetrics = append(rm.InstrumentationLibraryMetrics, ilm)
|
|
}
|
|
rms = append(rms, rm)
|
|
}
|
|
|
|
// Report any transform errors.
|
|
if len(errStrings) > 0 {
|
|
return rms, fmt.Errorf("%w:\n -%s", ErrTransforming, strings.Join(errStrings, "\n -"))
|
|
}
|
|
return rms, nil
|
|
}
|
|
|
|
// Record transforms a Record into an OTLP Metric. An ErrUnimplementedAgg
|
|
// error is returned if the Record Aggregator is not supported.
|
|
func Record(r export.Record) (*metricpb.Metric, error) {
|
|
d := r.Descriptor()
|
|
l := r.Labels()
|
|
switch a := r.Aggregator().(type) {
|
|
case aggregator.MinMaxSumCount:
|
|
return minMaxSumCount(d, l, a)
|
|
case aggregator.Sum:
|
|
return sum(d, l, a)
|
|
default:
|
|
return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a)
|
|
}
|
|
}
|
|
|
|
// sum transforms a Sum Aggregator into an OTLP Metric.
|
|
func sum(desc *metric.Descriptor, labels export.Labels, a aggregator.Sum) (*metricpb.Metric, error) {
|
|
sum, err := a.Sum()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := &metricpb.Metric{
|
|
MetricDescriptor: &metricpb.MetricDescriptor{
|
|
Name: desc.Name(),
|
|
Description: desc.Description(),
|
|
Unit: string(desc.Unit()),
|
|
Labels: stringKeyValues(labels.Iter()),
|
|
},
|
|
}
|
|
|
|
switch n := desc.NumberKind(); n {
|
|
case core.Int64NumberKind, core.Uint64NumberKind:
|
|
m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_INT64
|
|
m.Int64DataPoints = []*metricpb.Int64DataPoint{
|
|
{Value: sum.CoerceToInt64(n)},
|
|
}
|
|
case core.Float64NumberKind:
|
|
m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_DOUBLE
|
|
m.DoubleDataPoints = []*metricpb.DoubleDataPoint{
|
|
{Value: sum.CoerceToFloat64(n)},
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n)
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// minMaxSumCountValue returns the values of the MinMaxSumCount Aggregator
|
|
// as discret values.
|
|
func minMaxSumCountValues(a aggregator.MinMaxSumCount) (min, max, sum core.Number, count int64, err error) {
|
|
if min, err = a.Min(); err != nil {
|
|
return
|
|
}
|
|
if max, err = a.Max(); err != nil {
|
|
return
|
|
}
|
|
if sum, err = a.Sum(); err != nil {
|
|
return
|
|
}
|
|
if count, err = a.Count(); err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric.
|
|
func minMaxSumCount(desc *metric.Descriptor, labels export.Labels, a aggregator.MinMaxSumCount) (*metricpb.Metric, error) {
|
|
min, max, sum, count, err := minMaxSumCountValues(a)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
numKind := desc.NumberKind()
|
|
return &metricpb.Metric{
|
|
MetricDescriptor: &metricpb.MetricDescriptor{
|
|
Name: desc.Name(),
|
|
Description: desc.Description(),
|
|
Unit: string(desc.Unit()),
|
|
Type: metricpb.MetricDescriptor_SUMMARY,
|
|
Labels: stringKeyValues(labels.Iter()),
|
|
},
|
|
SummaryDataPoints: []*metricpb.SummaryDataPoint{
|
|
{
|
|
Count: uint64(count),
|
|
Sum: sum.CoerceToFloat64(numKind),
|
|
PercentileValues: []*metricpb.SummaryDataPoint_ValueAtPercentile{
|
|
{
|
|
Percentile: 0.0,
|
|
Value: min.CoerceToFloat64(numKind),
|
|
},
|
|
{
|
|
Percentile: 100.0,
|
|
Value: max.CoerceToFloat64(numKind),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// stringKeyValues transforms a label iterator into an OTLP StringKeyValues.
|
|
func stringKeyValues(iter export.LabelIterator) []*commonpb.StringKeyValue {
|
|
l := iter.Len()
|
|
if l == 0 {
|
|
return nil
|
|
}
|
|
result := make([]*commonpb.StringKeyValue, 0, l)
|
|
for iter.Next() {
|
|
kv := iter.Label()
|
|
result = append(result, &commonpb.StringKeyValue{
|
|
Key: string(kv.Key),
|
|
Value: kv.Value.Emit(),
|
|
})
|
|
}
|
|
return result
|
|
}
|