mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-20 03:30:02 +02:00
Merge branch 'master' into feature/jaeger-exporter-env
This commit is contained in:
commit
f06ae571ff
@ -27,7 +27,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
)
|
||||
@ -216,11 +216,11 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
|
||||
desc := c.toDesc(record, labelKeys)
|
||||
|
||||
if hist, ok := agg.(aggregator.Histogram); ok {
|
||||
if hist, ok := agg.(aggregation.Histogram); ok {
|
||||
if err := c.exportHistogram(ch, hist, numberKind, desc, labels); err != nil {
|
||||
return fmt.Errorf("exporting histogram: %w", err)
|
||||
}
|
||||
} else if dist, ok := agg.(aggregator.Distribution); ok {
|
||||
} else if dist, ok := agg.(aggregation.Distribution); ok {
|
||||
// TODO: summaries values are never being resetted.
|
||||
// As measurements are recorded, new records starts to have less impact on these summaries.
|
||||
// We should implement an solution that is similar to the Prometheus Clients
|
||||
@ -232,11 +232,11 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
if err := c.exportSummary(ch, dist, numberKind, desc, labels); err != nil {
|
||||
return fmt.Errorf("exporting summary: %w", err)
|
||||
}
|
||||
} else if sum, ok := agg.(aggregator.Sum); ok {
|
||||
} else if sum, ok := agg.(aggregation.Sum); ok {
|
||||
if err := c.exportCounter(ch, sum, numberKind, desc, labels); err != nil {
|
||||
return fmt.Errorf("exporting counter: %w", err)
|
||||
}
|
||||
} else if lastValue, ok := agg.(aggregator.LastValue); ok {
|
||||
} else if lastValue, ok := agg.(aggregation.LastValue); ok {
|
||||
if err := c.exportLastValue(ch, lastValue, numberKind, desc, labels); err != nil {
|
||||
return fmt.Errorf("exporting last value: %w", err)
|
||||
}
|
||||
@ -248,7 +248,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregator.LastValue, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
|
||||
func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregation.LastValue, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
|
||||
lv, _, err := lvagg.LastValue()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving last value: %w", err)
|
||||
@ -263,7 +263,7 @@ func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregato
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregator.Sum, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
|
||||
func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
|
||||
v, err := sum.Sum()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving counter: %w", err)
|
||||
@ -278,7 +278,7 @@ func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregator.Su
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.Distribution, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
|
||||
func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregation.Distribution, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
|
||||
count, err := dist.Count()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving count: %w", err)
|
||||
@ -305,7 +305,7 @@ func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.D
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator.Histogram, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
|
||||
func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregation.Histogram, kind metric.NumberKind, desc *prometheus.Desc, labels []string) error {
|
||||
buckets, err := hist.Histogram()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving histogram: %w", err)
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
)
|
||||
@ -98,7 +98,7 @@ func NewRawExporter(config Config) (*Exporter, error) {
|
||||
} else {
|
||||
for _, q := range config.Quantiles {
|
||||
if q < 0 || q > 1 {
|
||||
return nil, aggregator.ErrInvalidQuantile
|
||||
return nil, aggregation.ErrInvalidQuantile
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -164,7 +164,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
|
||||
|
||||
var expose expoLine
|
||||
|
||||
if sum, ok := agg.(aggregator.Sum); ok {
|
||||
if sum, ok := agg.(aggregation.Sum); ok {
|
||||
value, err := sum.Sum()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -172,7 +172,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
|
||||
expose.Sum = value.AsInterface(kind)
|
||||
}
|
||||
|
||||
if mmsc, ok := agg.(aggregator.MinMaxSumCount); ok {
|
||||
if mmsc, ok := agg.(aggregation.MinMaxSumCount); ok {
|
||||
count, err := mmsc.Count()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -191,7 +191,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
|
||||
}
|
||||
expose.Min = min.AsInterface(kind)
|
||||
|
||||
if dist, ok := agg.(aggregator.Distribution); ok && len(e.config.Quantiles) != 0 {
|
||||
if dist, ok := agg.(aggregation.Distribution); ok && len(e.config.Quantiles) != 0 {
|
||||
summary := make([]expoQuantile, len(e.config.Quantiles))
|
||||
expose.Quantiles = summary
|
||||
|
||||
@ -208,7 +208,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if lv, ok := agg.(aggregator.LastValue); ok {
|
||||
} else if lv, ok := agg.(aggregation.LastValue); ok {
|
||||
value, timestamp, err := lv.LastValue()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
"go.opentelemetry.io/otel/exporters/metric/stdout"
|
||||
"go.opentelemetry.io/otel/exporters/metric/test"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
@ -80,7 +80,7 @@ func TestStdoutInvalidQuantile(t *testing.T) {
|
||||
Quantiles: []float64{1.1, 0.9},
|
||||
})
|
||||
require.Error(t, err, "Invalid quantile error expected")
|
||||
require.Equal(t, aggregator.ErrInvalidQuantile, err)
|
||||
require.Equal(t, aggregation.ErrInvalidQuantile, err)
|
||||
}
|
||||
|
||||
func TestStdoutTimestamp(t *testing.T) {
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
@ -117,7 +117,7 @@ func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.
|
||||
|
||||
func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
|
||||
for _, r := range p.updates {
|
||||
if err := f(r); err != nil && !errors.Is(err, aggregator.ErrNoData) {
|
||||
if err := f(r); err != nil && !errors.Is(err, aggregation.ErrNoData) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
31
exporters/otlp/internal/transform/instrumentation.go
Normal file
31
exporters/otlp/internal/transform/instrumentation.go
Normal file
@ -0,0 +1,31 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package transform
|
||||
|
||||
import (
|
||||
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
)
|
||||
|
||||
func instrumentationLibrary(il instrumentation.Library) *commonpb.InstrumentationLibrary {
|
||||
if il == (instrumentation.Library{}) {
|
||||
return nil
|
||||
}
|
||||
return &commonpb.InstrumentationLibrary{
|
||||
Name: il.Name,
|
||||
Version: il.Version,
|
||||
}
|
||||
}
|
@ -30,7 +30,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
@ -230,9 +230,9 @@ func Record(r export.Record) (*metricpb.Metric, error) {
|
||||
d := r.Descriptor()
|
||||
l := r.Labels()
|
||||
switch a := r.Aggregator().(type) {
|
||||
case aggregator.MinMaxSumCount:
|
||||
case aggregation.MinMaxSumCount:
|
||||
return minMaxSumCount(d, l, a)
|
||||
case aggregator.Sum:
|
||||
case aggregation.Sum:
|
||||
return sum(d, l, a)
|
||||
default:
|
||||
return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a)
|
||||
@ -240,7 +240,7 @@ func Record(r export.Record) (*metricpb.Metric, error) {
|
||||
}
|
||||
|
||||
// sum transforms a Sum Aggregator into an OTLP Metric.
|
||||
func sum(desc *metric.Descriptor, labels *label.Set, a aggregator.Sum) (*metricpb.Metric, error) {
|
||||
func sum(desc *metric.Descriptor, labels *label.Set, a aggregation.Sum) (*metricpb.Metric, error) {
|
||||
sum, err := a.Sum()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -275,7 +275,7 @@ func sum(desc *metric.Descriptor, labels *label.Set, a aggregator.Sum) (*metricp
|
||||
|
||||
// minMaxSumCountValue returns the values of the MinMaxSumCount Aggregator
|
||||
// as discret values.
|
||||
func minMaxSumCountValues(a aggregator.MinMaxSumCount) (min, max, sum metric.Number, count int64, err error) {
|
||||
func minMaxSumCountValues(a aggregation.MinMaxSumCount) (min, max, sum metric.Number, count int64, err error) {
|
||||
if min, err = a.Min(); err != nil {
|
||||
return
|
||||
}
|
||||
@ -292,7 +292,7 @@ func minMaxSumCountValues(a aggregator.MinMaxSumCount) (min, max, sum metric.Num
|
||||
}
|
||||
|
||||
// minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric.
|
||||
func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregator.MinMaxSumCount) (*metricpb.Metric, error) {
|
||||
func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregation.MinMaxSumCount) (*metricpb.Metric, error) {
|
||||
min, max, sum, count, err := minMaxSumCountValues(a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/api/unit"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
)
|
||||
@ -86,7 +86,7 @@ func TestMinMaxSumCountValue(t *testing.T) {
|
||||
|
||||
// Prior to checkpointing ErrNoData should be returned.
|
||||
_, _, _, _, err := minMaxSumCountValues(mmsc)
|
||||
assert.EqualError(t, err, aggregator.ErrNoData.Error())
|
||||
assert.EqualError(t, err, aggregation.ErrNoData.Error())
|
||||
|
||||
// Checkpoint to set non-zero values
|
||||
mmsc.Checkpoint(&metric.Descriptor{})
|
||||
@ -198,7 +198,7 @@ func TestMinMaxSumCountPropagatesErrors(t *testing.T) {
|
||||
mmsc := minmaxsumcount.New(&metric.Descriptor{})
|
||||
_, _, _, _, err := minMaxSumCountValues(mmsc)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, aggregator.ErrNoData, err)
|
||||
assert.Equal(t, aggregation.ErrNoData, err)
|
||||
}
|
||||
|
||||
func TestSumMetricDescriptor(t *testing.T) {
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
export "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -33,30 +34,61 @@ func SpanData(sdl []*export.SpanData) []*tracepb.ResourceSpans {
|
||||
if len(sdl) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Group by the distinct representation of the Resource.
|
||||
|
||||
rsm := make(map[label.Distinct]*tracepb.ResourceSpans)
|
||||
|
||||
for _, sd := range sdl {
|
||||
if sd != nil {
|
||||
key := sd.Resource.Equivalent()
|
||||
type ilsKey struct {
|
||||
r label.Distinct
|
||||
il instrumentation.Library
|
||||
}
|
||||
ilsm := make(map[ilsKey]*tracepb.InstrumentationLibrarySpans)
|
||||
|
||||
rs, ok := rsm[key]
|
||||
if !ok {
|
||||
rs = &tracepb.ResourceSpans{
|
||||
Resource: Resource(sd.Resource),
|
||||
InstrumentationLibrarySpans: []*tracepb.InstrumentationLibrarySpans{
|
||||
{
|
||||
Spans: []*tracepb.Span{},
|
||||
},
|
||||
},
|
||||
}
|
||||
rsm[key] = rs
|
||||
var resources int
|
||||
for _, sd := range sdl {
|
||||
if sd == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
rKey := sd.Resource.Equivalent()
|
||||
iKey := ilsKey{
|
||||
r: rKey,
|
||||
il: sd.InstrumentationLibrary,
|
||||
}
|
||||
ils, iOk := ilsm[iKey]
|
||||
if !iOk {
|
||||
// Either the resource or instrumentation library were unknown.
|
||||
ils = &tracepb.InstrumentationLibrarySpans{
|
||||
InstrumentationLibrary: instrumentationLibrary(sd.InstrumentationLibrary),
|
||||
Spans: []*tracepb.Span{},
|
||||
}
|
||||
rs.InstrumentationLibrarySpans[0].Spans =
|
||||
append(rs.InstrumentationLibrarySpans[0].Spans, span(sd))
|
||||
}
|
||||
ils.Spans = append(ils.Spans, span(sd))
|
||||
ilsm[iKey] = ils
|
||||
|
||||
rs, rOk := rsm[rKey]
|
||||
if !rOk {
|
||||
resources++
|
||||
// The resource was unknown.
|
||||
rs = &tracepb.ResourceSpans{
|
||||
Resource: Resource(sd.Resource),
|
||||
InstrumentationLibrarySpans: []*tracepb.InstrumentationLibrarySpans{ils},
|
||||
}
|
||||
rsm[rKey] = rs
|
||||
continue
|
||||
}
|
||||
|
||||
// The resource has been seen before. Check if the instrumentation
|
||||
// library lookup was unknown because if so we need to add it to the
|
||||
// ResourceSpans. Otherwise, the instrumentation library has already
|
||||
// been seen and the append we did above will be included it in the
|
||||
// InstrumentationLibrarySpans reference.
|
||||
if !iOk {
|
||||
rs.InstrumentationLibrarySpans = append(rs.InstrumentationLibrarySpans, ils)
|
||||
}
|
||||
}
|
||||
rss := make([]*tracepb.ResourceSpans, 0, len(rsm))
|
||||
|
||||
// Transform the categorized map into a slice
|
||||
rss := make([]*tracepb.ResourceSpans, 0, resources)
|
||||
for _, rs := range rsm {
|
||||
rss = append(rss, rs)
|
||||
}
|
||||
|
@ -23,11 +23,13 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
tracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
"go.opentelemetry.io/otel/api/kv"
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
export "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
@ -322,6 +324,10 @@ func TestSpanData(t *testing.T) {
|
||||
DroppedMessageEventCount: 2,
|
||||
DroppedLinkCount: 3,
|
||||
Resource: resource.New(kv.String("rk1", "rv1"), kv.Int64("rk2", 5)),
|
||||
InstrumentationLibrary: instrumentation.Library{
|
||||
Name: "go.opentelemetry.io/test/otel",
|
||||
Version: "v0.0.1",
|
||||
},
|
||||
}
|
||||
|
||||
// Not checking resource as the underlying map of our Resource makes
|
||||
@ -345,16 +351,14 @@ func TestSpanData(t *testing.T) {
|
||||
}
|
||||
|
||||
got := SpanData([]*export.SpanData{spanData})
|
||||
if !assert.Len(t, got, 1) {
|
||||
return
|
||||
}
|
||||
require.Len(t, got, 1)
|
||||
|
||||
// Break the span down as large diffs can be hard to read.
|
||||
actualSpans := got[0].GetInstrumentationLibrarySpans()
|
||||
if !assert.Len(t, actualSpans, 1) && !assert.Len(t, actualSpans[0].Spans, 1) {
|
||||
return
|
||||
}
|
||||
actualSpan := actualSpans[0].Spans[0]
|
||||
assert.Equal(t, got[0].GetResource(), Resource(spanData.Resource))
|
||||
ilSpans := got[0].GetInstrumentationLibrarySpans()
|
||||
require.Len(t, ilSpans, 1)
|
||||
assert.Equal(t, ilSpans[0].GetInstrumentationLibrary(), instrumentationLibrary(spanData.InstrumentationLibrary))
|
||||
require.Len(t, ilSpans[0].Spans, 1)
|
||||
actualSpan := ilSpans[0].Spans[0]
|
||||
|
||||
if diff := cmp.Diff(expectedSpan, actualSpan, cmp.Comparer(proto.Equal)); diff != "" {
|
||||
t.Fatalf("transformed span differs %v\n", diff)
|
||||
@ -363,7 +367,9 @@ func TestSpanData(t *testing.T) {
|
||||
|
||||
// Empty parent span ID should be treated as root span.
|
||||
func TestRootSpanData(t *testing.T) {
|
||||
rs := SpanData([]*export.SpanData{{}})[0]
|
||||
sd := SpanData([]*export.SpanData{{}})
|
||||
require.Len(t, sd, 1)
|
||||
rs := sd[0]
|
||||
got := rs.GetInstrumentationLibrarySpans()[0].GetSpans()[0].GetParentSpanId()
|
||||
|
||||
// Empty means root span.
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
@ -67,7 +67,7 @@ type checkpointSet struct {
|
||||
|
||||
func (m *checkpointSet) ForEach(fn func(metricsdk.Record) error) error {
|
||||
for _, r := range m.records {
|
||||
if err := fn(r); err != nil && err != aggregator.ErrNoData {
|
||||
if err := fn(r); err != nil && err != aggregation.ErrNoData {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/kv"
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
@ -96,6 +97,32 @@ func TestExportSpans(t *testing.T) {
|
||||
StatusCode: codes.OK,
|
||||
StatusMessage: "Ok",
|
||||
Resource: resource.New(kv.String("instance", "tester-a")),
|
||||
InstrumentationLibrary: instrumentation.Library{
|
||||
Name: "lib-a",
|
||||
Version: "v0.1.0",
|
||||
},
|
||||
},
|
||||
{
|
||||
SpanContext: apitrace.SpanContext{
|
||||
TraceID: apitrace.ID([16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}),
|
||||
SpanID: apitrace.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}),
|
||||
TraceFlags: byte(1),
|
||||
},
|
||||
SpanKind: apitrace.SpanKindServer,
|
||||
Name: "secondary parent process",
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
Attributes: []kv.KeyValue{
|
||||
kv.String("user", "alice"),
|
||||
kv.Bool("authenticated", true),
|
||||
},
|
||||
StatusCode: codes.OK,
|
||||
StatusMessage: "Ok",
|
||||
Resource: resource.New(kv.String("instance", "tester-a")),
|
||||
InstrumentationLibrary: instrumentation.Library{
|
||||
Name: "lib-b",
|
||||
Version: "v0.1.0",
|
||||
},
|
||||
},
|
||||
{
|
||||
SpanContext: apitrace.SpanContext{
|
||||
@ -115,6 +142,10 @@ func TestExportSpans(t *testing.T) {
|
||||
StatusCode: codes.OK,
|
||||
StatusMessage: "Ok",
|
||||
Resource: resource.New(kv.String("instance", "tester-a")),
|
||||
InstrumentationLibrary: instrumentation.Library{
|
||||
Name: "lib-a",
|
||||
Version: "v0.1.0",
|
||||
},
|
||||
},
|
||||
{
|
||||
SpanContext: apitrace.SpanContext{
|
||||
@ -133,6 +164,10 @@ func TestExportSpans(t *testing.T) {
|
||||
StatusCode: codes.Unauthenticated,
|
||||
StatusMessage: "Unauthenticated",
|
||||
Resource: resource.New(kv.String("instance", "tester-b")),
|
||||
InstrumentationLibrary: instrumentation.Library{
|
||||
Name: "lib-a",
|
||||
Version: "v1.1.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
[]tracepb.ResourceSpans{
|
||||
@ -148,6 +183,10 @@ func TestExportSpans(t *testing.T) {
|
||||
},
|
||||
InstrumentationLibrarySpans: []*tracepb.InstrumentationLibrarySpans{
|
||||
{
|
||||
InstrumentationLibrary: &commonpb.InstrumentationLibrary{
|
||||
Name: "lib-a",
|
||||
Version: "v0.1.0",
|
||||
},
|
||||
Spans: []*tracepb.Span{
|
||||
{
|
||||
TraceId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
|
||||
@ -200,6 +239,38 @@ func TestExportSpans(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
InstrumentationLibrary: &commonpb.InstrumentationLibrary{
|
||||
Name: "lib-b",
|
||||
Version: "v0.1.0",
|
||||
},
|
||||
Spans: []*tracepb.Span{
|
||||
{
|
||||
TraceId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2},
|
||||
SpanId: []byte{0, 0, 0, 0, 0, 0, 0, 1},
|
||||
Name: "secondary parent process",
|
||||
Kind: tracepb.Span_SERVER,
|
||||
StartTimeUnixNano: uint64(startTime.UnixNano()),
|
||||
EndTimeUnixNano: uint64(endTime.UnixNano()),
|
||||
Attributes: []*commonpb.AttributeKeyValue{
|
||||
{
|
||||
Key: "user",
|
||||
Type: commonpb.AttributeKeyValue_STRING,
|
||||
StringValue: "alice",
|
||||
},
|
||||
{
|
||||
Key: "authenticated",
|
||||
Type: commonpb.AttributeKeyValue_BOOL,
|
||||
BoolValue: true,
|
||||
},
|
||||
},
|
||||
Status: &tracepb.Status{
|
||||
Code: tracepb.Status_Ok,
|
||||
Message: "Ok",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -214,6 +285,10 @@ func TestExportSpans(t *testing.T) {
|
||||
},
|
||||
InstrumentationLibrarySpans: []*tracepb.InstrumentationLibrarySpans{
|
||||
{
|
||||
InstrumentationLibrary: &commonpb.InstrumentationLibrary{
|
||||
Name: "lib-a",
|
||||
Version: "v1.1.0",
|
||||
},
|
||||
Spans: []*tracepb.Span{
|
||||
{
|
||||
TraceId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2},
|
||||
|
@ -216,6 +216,13 @@ func spanDataToThrift(data *export.SpanData) *gen.Span {
|
||||
}
|
||||
}
|
||||
|
||||
if il := data.InstrumentationLibrary; il.Name != "" {
|
||||
tags = append(tags, getStringTag("instrumentation.name", il.Name))
|
||||
if il.Version != "" {
|
||||
tags = append(tags, getStringTag("instrumentation.version", il.Name))
|
||||
}
|
||||
}
|
||||
|
||||
tags = append(tags,
|
||||
getInt64Tag("status.code", int64(data.StatusCode)),
|
||||
getStringTag("status.message", data.StatusMessage),
|
||||
|
@ -122,7 +122,11 @@ func TestExporter_ExportSpan(t *testing.T) {
|
||||
`{` +
|
||||
`"Key":"rk1",` +
|
||||
`"Value":{"Type":"STRING","Value":"rv11"}` +
|
||||
`}]}` + "\n"
|
||||
`}],` +
|
||||
`"InstrumentationLibrary":{` +
|
||||
`"Name":"",` +
|
||||
`"Version":""` +
|
||||
`}}` + "\n"
|
||||
|
||||
if got != expectedOutput {
|
||||
t.Errorf("Want: %v but got: %v", expectedOutput, got)
|
||||
|
@ -12,54 +12,68 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aggregator // import "go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
package aggregation // import "go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
)
|
||||
|
||||
// These interfaces describe the various ways to access state from an
|
||||
// Aggregator.
|
||||
// Aggregation.
|
||||
|
||||
type (
|
||||
// Aggregation is an interface returned by the Aggregator
|
||||
// containing an interval of metric data.
|
||||
Aggregation interface {
|
||||
// Kind returns a short identifying string to identify
|
||||
// the Aggregator that was used to produce the
|
||||
// Aggregation (e.g., "Sum").
|
||||
Kind() Kind
|
||||
}
|
||||
|
||||
// Sum returns an aggregated sum.
|
||||
Sum interface {
|
||||
Aggregation
|
||||
Sum() (metric.Number, error)
|
||||
}
|
||||
|
||||
// Sum returns the number of values that were aggregated.
|
||||
Count interface {
|
||||
Aggregation
|
||||
Count() (int64, error)
|
||||
}
|
||||
|
||||
// Min returns the minimum value over the set of values that were aggregated.
|
||||
Min interface {
|
||||
Aggregation
|
||||
Min() (metric.Number, error)
|
||||
}
|
||||
|
||||
// Max returns the maximum value over the set of values that were aggregated.
|
||||
Max interface {
|
||||
Aggregation
|
||||
Max() (metric.Number, error)
|
||||
}
|
||||
|
||||
// Quantile returns an exact or estimated quantile over the
|
||||
// set of values that were aggregated.
|
||||
Quantile interface {
|
||||
Aggregation
|
||||
Quantile(float64) (metric.Number, error)
|
||||
}
|
||||
|
||||
// LastValue returns the latest value that was aggregated.
|
||||
LastValue interface {
|
||||
Aggregation
|
||||
LastValue() (metric.Number, time.Time, error)
|
||||
}
|
||||
|
||||
// Points returns the raw set of values that were aggregated.
|
||||
Points interface {
|
||||
Aggregation
|
||||
Points() ([]metric.Number, error)
|
||||
}
|
||||
|
||||
@ -80,26 +94,58 @@ type (
|
||||
|
||||
// Histogram returns the count of events in pre-determined buckets.
|
||||
Histogram interface {
|
||||
Sum
|
||||
Aggregation
|
||||
Sum() (metric.Number, error)
|
||||
Histogram() (Buckets, error)
|
||||
}
|
||||
|
||||
// MinMaxSumCount supports the Min, Max, Sum, and Count interfaces.
|
||||
MinMaxSumCount interface {
|
||||
Min
|
||||
Max
|
||||
Sum
|
||||
Count
|
||||
Aggregation
|
||||
Min() (metric.Number, error)
|
||||
Max() (metric.Number, error)
|
||||
Sum() (metric.Number, error)
|
||||
Count() (int64, error)
|
||||
}
|
||||
|
||||
// Distribution supports the Min, Max, Sum, Count, and Quantile
|
||||
// interfaces.
|
||||
Distribution interface {
|
||||
MinMaxSumCount
|
||||
Quantile
|
||||
Aggregation
|
||||
Min() (metric.Number, error)
|
||||
Max() (metric.Number, error)
|
||||
Sum() (metric.Number, error)
|
||||
Count() (int64, error)
|
||||
Quantile(float64) (metric.Number, error)
|
||||
}
|
||||
)
|
||||
|
||||
type (
|
||||
// Kind is a short name for the Aggregator that produces an
|
||||
// Aggregation, used for descriptive purpose only. Kind is a
|
||||
// string to allow user-defined Aggregators.
|
||||
//
|
||||
// When deciding how to handle an Aggregation, Exporters are
|
||||
// encouraged to decide based on conversion to the above
|
||||
// interfaces based on strength, not on Kind value, when
|
||||
// deciding how to expose metric data. This enables
|
||||
// user-supplied Aggregators to replace builtin Aggregators.
|
||||
//
|
||||
// For example, test for a Distribution before testing for a
|
||||
// MinMaxSumCount, test for a Histogram before testing for a
|
||||
// Sum, and so on.
|
||||
Kind string
|
||||
)
|
||||
|
||||
const (
|
||||
SumKind Kind = "Sum"
|
||||
MinMaxSumCountKind Kind = "MinMaxSumCount"
|
||||
HistogramKind Kind = "Histogram"
|
||||
LastValueKind Kind = "Lastvalue"
|
||||
SketchKind Kind = "Sketch"
|
||||
ExactKind Kind = "Exact"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidQuantile = fmt.Errorf("the requested quantile is out of range")
|
||||
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
|
||||
@ -112,29 +158,7 @@ var (
|
||||
ErrNoData = fmt.Errorf("no data collected by this aggregator")
|
||||
)
|
||||
|
||||
// NewInconsistentMergeError formats an error describing an attempt to
|
||||
// merge different-type aggregators. The result can be unwrapped as
|
||||
// an ErrInconsistentType.
|
||||
func NewInconsistentMergeError(a1, a2 export.Aggregator) error {
|
||||
return fmt.Errorf("cannot merge %T with %T: %w", a1, a2, ErrInconsistentType)
|
||||
}
|
||||
|
||||
// RangeTest is a commmon routine for testing for valid input values.
|
||||
// This rejects NaN values. This rejects negative values when the
|
||||
// metric instrument does not support negative values, including
|
||||
// monotonic counter metrics and absolute ValueRecorder metrics.
|
||||
func RangeTest(number metric.Number, descriptor *metric.Descriptor) error {
|
||||
numberKind := descriptor.NumberKind()
|
||||
|
||||
if numberKind == metric.Float64NumberKind && math.IsNaN(number.AsFloat64()) {
|
||||
return ErrNaNInput
|
||||
}
|
||||
|
||||
switch descriptor.MetricKind() {
|
||||
case metric.CounterKind, metric.SumObserverKind:
|
||||
if number.IsNegative(numberKind) {
|
||||
return ErrNegativeInput
|
||||
}
|
||||
}
|
||||
return nil
|
||||
// String returns the string value of Kind.
|
||||
func (k Kind) String() string {
|
||||
return string(k)
|
||||
}
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/kv"
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
@ -71,6 +72,10 @@ type SpanData struct {
|
||||
|
||||
// Resource contains attributes representing an entity that produced this span.
|
||||
Resource *resource.Resource
|
||||
|
||||
// InstrumentationLibrary defines the instrumentation library used to
|
||||
// providing instrumentation.
|
||||
InstrumentationLibrary instrumentation.Library
|
||||
}
|
||||
|
||||
// Event is used to describe an Event with a message string and set of
|
||||
|
51
sdk/metric/aggregator/aggregator.go
Normal file
51
sdk/metric/aggregator/aggregator.go
Normal file
@ -0,0 +1,51 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aggregator // import "go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
)
|
||||
|
||||
// NewInconsistentMergeError formats an error describing an attempt to
|
||||
// merge different-type aggregators. The result can be unwrapped as
|
||||
// an ErrInconsistentType.
|
||||
func NewInconsistentMergeError(a1, a2 export.Aggregator) error {
|
||||
return fmt.Errorf("cannot merge %T with %T: %w", a1, a2, aggregation.ErrInconsistentType)
|
||||
}
|
||||
|
||||
// RangeTest is a commmon routine for testing for valid input values.
|
||||
// This rejects NaN values. This rejects negative values when the
|
||||
// metric instrument does not support negative values, including
|
||||
// monotonic counter metrics and absolute ValueRecorder metrics.
|
||||
func RangeTest(number metric.Number, descriptor *metric.Descriptor) error {
|
||||
numberKind := descriptor.NumberKind()
|
||||
|
||||
if numberKind == metric.Float64NumberKind && math.IsNaN(number.AsFloat64()) {
|
||||
return aggregation.ErrNaNInput
|
||||
}
|
||||
|
||||
switch descriptor.MetricKind() {
|
||||
case metric.CounterKind, metric.SumObserverKind:
|
||||
if number.IsNegative(numberKind) {
|
||||
return aggregation.ErrNegativeInput
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aggregator_test // import "go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
package aggregator_test // import "go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@ -22,7 +22,8 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
)
|
||||
@ -34,7 +35,7 @@ func TestInconsistentMergeErr(t *testing.T) {
|
||||
"cannot merge *sum.Aggregator with *lastvalue.Aggregator: inconsistent aggregator types",
|
||||
err.Error(),
|
||||
)
|
||||
require.True(t, errors.Is(err, aggregator.ErrInconsistentType))
|
||||
require.True(t, errors.Is(err, aggregation.ErrInconsistentType))
|
||||
}
|
||||
|
||||
func testRangeNaN(t *testing.T, desc *metric.Descriptor) {
|
||||
@ -43,7 +44,7 @@ func testRangeNaN(t *testing.T, desc *metric.Descriptor) {
|
||||
err := aggregator.RangeTest(nan, desc)
|
||||
|
||||
if desc.NumberKind() == metric.Float64NumberKind {
|
||||
require.Equal(t, aggregator.ErrNaNInput, err)
|
||||
require.Equal(t, aggregation.ErrNaNInput, err)
|
||||
} else {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
@ -64,7 +65,7 @@ func testRangeNegative(t *testing.T, desc *metric.Descriptor) {
|
||||
negErr := aggregator.RangeTest(neg, desc)
|
||||
|
||||
require.Nil(t, posErr)
|
||||
require.Equal(t, negErr, aggregator.ErrNegativeInput)
|
||||
require.Equal(t, negErr, aggregation.ErrNegativeInput)
|
||||
}
|
||||
|
||||
func TestRangeTest(t *testing.T) {
|
@ -23,7 +23,8 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -41,9 +42,9 @@ type (
|
||||
)
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregator.MinMaxSumCount = &Aggregator{}
|
||||
var _ aggregator.Distribution = &Aggregator{}
|
||||
var _ aggregator.Points = &Aggregator{}
|
||||
var _ aggregation.MinMaxSumCount = &Aggregator{}
|
||||
var _ aggregation.Distribution = &Aggregator{}
|
||||
var _ aggregation.Points = &Aggregator{}
|
||||
|
||||
// New returns a new array aggregator, which aggregates recorded
|
||||
// measurements by storing them in an array. This type uses a mutex
|
||||
@ -52,6 +53,11 @@ func New() *Aggregator {
|
||||
return &Aggregator{}
|
||||
}
|
||||
|
||||
// Kind returns aggregation.ExactKind.
|
||||
func (c *Aggregator) Kind() aggregation.Kind {
|
||||
return aggregation.ExactKind
|
||||
}
|
||||
|
||||
// Sum returns the sum of values in the checkpoint.
|
||||
func (c *Aggregator) Sum() (metric.Number, error) {
|
||||
return c.ckptSum, nil
|
||||
@ -179,11 +185,11 @@ func (p *points) Swap(i, j int) {
|
||||
// of a quantile.
|
||||
func (p *points) Quantile(q float64) (metric.Number, error) {
|
||||
if len(*p) == 0 {
|
||||
return metric.Number(0), aggregator.ErrNoData
|
||||
return metric.Number(0), aggregation.ErrNoData
|
||||
}
|
||||
|
||||
if q < 0 || q > 1 {
|
||||
return metric.Number(0), aggregator.ErrInvalidQuantile
|
||||
return metric.Number(0), aggregation.ErrInvalidQuantile
|
||||
}
|
||||
|
||||
if q == 0 || len(*p) == 1 {
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
ottest "go.opentelemetry.io/otel/internal/testing"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
|
||||
)
|
||||
|
||||
@ -199,15 +199,15 @@ func TestArrayErrors(t *testing.T) {
|
||||
|
||||
_, err := agg.Max()
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrNoData)
|
||||
require.Equal(t, err, aggregation.ErrNoData)
|
||||
|
||||
_, err = agg.Min()
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrNoData)
|
||||
require.Equal(t, err, aggregation.ErrNoData)
|
||||
|
||||
_, err = agg.Quantile(0.1)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrNoData)
|
||||
require.Equal(t, err, aggregation.ErrNoData)
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
@ -228,11 +228,11 @@ func TestArrayErrors(t *testing.T) {
|
||||
|
||||
_, err = agg.Quantile(-0.0001)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrInvalidQuantile)
|
||||
require.Equal(t, err, aggregation.ErrInvalidQuantile)
|
||||
|
||||
_, err = agg.Quantile(1.0001)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrInvalidQuantile)
|
||||
require.Equal(t, err, aggregation.ErrInvalidQuantile)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -21,9 +21,9 @@ import (
|
||||
sdk "github.com/DataDog/sketches-go/ddsketch"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
// Config is an alias for the underlying DDSketch config object.
|
||||
@ -39,8 +39,8 @@ type Aggregator struct {
|
||||
}
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregator.MinMaxSumCount = &Aggregator{}
|
||||
var _ aggregator.Distribution = &Aggregator{}
|
||||
var _ aggregation.MinMaxSumCount = &Aggregator{}
|
||||
var _ aggregation.Distribution = &Aggregator{}
|
||||
|
||||
// New returns a new DDSketch aggregator.
|
||||
func New(desc *metric.Descriptor, cfg *Config) *Aggregator {
|
||||
@ -52,6 +52,11 @@ func New(desc *metric.Descriptor, cfg *Config) *Aggregator {
|
||||
}
|
||||
}
|
||||
|
||||
// Kind returns aggregation.SketchKind.
|
||||
func (c *Aggregator) Kind() aggregation.Kind {
|
||||
return aggregation.SketchKind
|
||||
}
|
||||
|
||||
// NewDefaultConfig returns a new, default DDSketch config.
|
||||
//
|
||||
// TODO: Should the Config constructor set minValue to -Inf to
|
||||
@ -85,11 +90,11 @@ func (c *Aggregator) Min() (metric.Number, error) {
|
||||
// It is an error if `q` is less than 0 or greated than 1.
|
||||
func (c *Aggregator) Quantile(q float64) (metric.Number, error) {
|
||||
if c.checkpoint.Count() == 0 {
|
||||
return metric.Number(0), aggregator.ErrNoData
|
||||
return metric.Number(0), aggregation.ErrNoData
|
||||
}
|
||||
f := c.checkpoint.Quantile(q)
|
||||
if math.IsNaN(f) {
|
||||
return metric.Number(0), aggregator.ErrInvalidQuantile
|
||||
return metric.Number(0), aggregation.ErrInvalidQuantile
|
||||
}
|
||||
return c.toNumber(f), nil
|
||||
}
|
||||
|
@ -21,7 +21,8 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
// Note: This code uses a Mutex to govern access to the exclusive
|
||||
@ -51,9 +52,9 @@ type (
|
||||
)
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregator.Sum = &Aggregator{}
|
||||
var _ aggregator.Count = &Aggregator{}
|
||||
var _ aggregator.Histogram = &Aggregator{}
|
||||
var _ aggregation.Sum = &Aggregator{}
|
||||
var _ aggregation.Count = &Aggregator{}
|
||||
var _ aggregation.Histogram = &Aggregator{}
|
||||
|
||||
// New returns a new aggregator for computing Histograms.
|
||||
//
|
||||
@ -79,6 +80,11 @@ func New(desc *metric.Descriptor, boundaries []float64) *Aggregator {
|
||||
}
|
||||
}
|
||||
|
||||
// Kind returns aggregation.HistogramKind.
|
||||
func (c *Aggregator) Kind() aggregation.Kind {
|
||||
return aggregation.HistogramKind
|
||||
}
|
||||
|
||||
// Sum returns the sum of all values in the checkpoint.
|
||||
func (c *Aggregator) Sum() (metric.Number, error) {
|
||||
c.lock.Lock()
|
||||
@ -94,10 +100,10 @@ func (c *Aggregator) Count() (int64, error) {
|
||||
}
|
||||
|
||||
// Histogram returns the count of events in pre-determined buckets.
|
||||
func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
|
||||
func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
return aggregator.Buckets{
|
||||
return aggregation.Buckets{
|
||||
Boundaries: c.boundaries,
|
||||
Counts: c.checkpoint.bucketCounts,
|
||||
}, nil
|
||||
|
@ -22,7 +22,8 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -53,7 +54,7 @@ type (
|
||||
)
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregator.LastValue = &Aggregator{}
|
||||
var _ aggregation.LastValue = &Aggregator{}
|
||||
|
||||
// An unset lastValue has zero timestamp and zero value.
|
||||
var unsetLastValue = &lastValueData{}
|
||||
@ -67,14 +68,19 @@ func New() *Aggregator {
|
||||
}
|
||||
}
|
||||
|
||||
// Kind returns aggregation.LastValueKind.
|
||||
func (g *Aggregator) Kind() aggregation.Kind {
|
||||
return aggregation.LastValueKind
|
||||
}
|
||||
|
||||
// LastValue returns the last-recorded lastValue value and the
|
||||
// corresponding timestamp. The error value aggregator.ErrNoData
|
||||
// corresponding timestamp. The error value aggregation.ErrNoData
|
||||
// will be returned if (due to a race condition) the checkpoint was
|
||||
// computed before the first value was set.
|
||||
func (g *Aggregator) LastValue() (metric.Number, time.Time, error) {
|
||||
gd := (*lastValueData)(g.checkpoint)
|
||||
if gd == unsetLastValue {
|
||||
return metric.Number(0), time.Time{}, aggregator.ErrNoData
|
||||
return metric.Number(0), time.Time{}, aggregation.ErrNoData
|
||||
}
|
||||
return gd.value.AsNumber(), gd.timestamp, nil
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
ottest "go.opentelemetry.io/otel/internal/testing"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
|
||||
)
|
||||
|
||||
@ -108,7 +108,7 @@ func TestLastValueNotSet(t *testing.T) {
|
||||
g.Checkpoint(descriptor)
|
||||
|
||||
value, timestamp, err := g.LastValue()
|
||||
require.Equal(t, aggregator.ErrNoData, err)
|
||||
require.Equal(t, aggregation.ErrNoData, err)
|
||||
require.True(t, timestamp.IsZero())
|
||||
require.Equal(t, metric.Number(0), value)
|
||||
}
|
||||
|
@ -20,7 +20,8 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -42,7 +43,7 @@ type (
|
||||
)
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregator.MinMaxSumCount = &Aggregator{}
|
||||
var _ aggregation.MinMaxSumCount = &Aggregator{}
|
||||
|
||||
// New returns a new aggregator for computing the min, max, sum, and
|
||||
// count. It does not compute quantile information other than Min and
|
||||
@ -62,6 +63,11 @@ func New(desc *metric.Descriptor) *Aggregator {
|
||||
}
|
||||
}
|
||||
|
||||
// Kind returns aggregation.MinMaxSumCountKind.
|
||||
func (c *Aggregator) Kind() aggregation.Kind {
|
||||
return aggregation.MinMaxSumCountKind
|
||||
}
|
||||
|
||||
// Sum returns the sum of values in the checkpoint.
|
||||
func (c *Aggregator) Sum() (metric.Number, error) {
|
||||
c.lock.Lock()
|
||||
@ -77,25 +83,25 @@ func (c *Aggregator) Count() (int64, error) {
|
||||
}
|
||||
|
||||
// Min returns the minimum value in the checkpoint.
|
||||
// The error value aggregator.ErrNoData will be returned
|
||||
// The error value aggregation.ErrNoData will be returned
|
||||
// if there were no measurements recorded during the checkpoint.
|
||||
func (c *Aggregator) Min() (metric.Number, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
if c.checkpoint.count.IsZero(metric.Uint64NumberKind) {
|
||||
return c.kind.Zero(), aggregator.ErrNoData
|
||||
return c.kind.Zero(), aggregation.ErrNoData
|
||||
}
|
||||
return c.checkpoint.min, nil
|
||||
}
|
||||
|
||||
// Max returns the maximum value in the checkpoint.
|
||||
// The error value aggregator.ErrNoData will be returned
|
||||
// The error value aggregation.ErrNoData will be returned
|
||||
// if there were no measurements recorded during the checkpoint.
|
||||
func (c *Aggregator) Max() (metric.Number, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
if c.checkpoint.count.IsZero(metric.Uint64NumberKind) {
|
||||
return c.kind.Zero(), aggregator.ErrNoData
|
||||
return c.kind.Zero(), aggregation.ErrNoData
|
||||
}
|
||||
return c.checkpoint.max, nil
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
|
||||
)
|
||||
|
||||
@ -193,7 +193,7 @@ func TestMaxSumCountNotSet(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
max, err := agg.Max()
|
||||
require.Equal(t, aggregator.ErrNoData, err)
|
||||
require.Equal(t, aggregation.ErrNoData, err)
|
||||
require.Equal(t, metric.Number(0), max)
|
||||
})
|
||||
}
|
||||
|
@ -19,7 +19,8 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
// Aggregator aggregates counter events.
|
||||
@ -34,15 +35,20 @@ type Aggregator struct {
|
||||
}
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregator.Sum = &Aggregator{}
|
||||
var _ aggregation.Sum = &Aggregator{}
|
||||
|
||||
// New returns a new counter aggregator implemented by atomic
|
||||
// operations. This aggregator implements the aggregator.Sum
|
||||
// operations. This aggregator implements the aggregation.Sum
|
||||
// export interface.
|
||||
func New() *Aggregator {
|
||||
return &Aggregator{}
|
||||
}
|
||||
|
||||
// Kind returns aggregation.SumKind.
|
||||
func (c *Aggregator) Kind() aggregation.Kind {
|
||||
return aggregation.SumKind
|
||||
}
|
||||
|
||||
// Sum returns the last-checkpointed sum. This will never return an
|
||||
// error.
|
||||
func (c *Aggregator) Sum() (metric.Number, error) {
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
ottest "go.opentelemetry.io/otel/internal/testing"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
const Magnitude = 1000
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/exporters/metric/test"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
|
||||
@ -175,7 +175,7 @@ func TestPushTicker(t *testing.T) {
|
||||
require.Equal(t, "counter", records[0].Descriptor().Name())
|
||||
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
|
||||
|
||||
sum, err := records[0].Aggregator().(aggregator.Sum).Sum()
|
||||
sum, err := records[0].Aggregator().(aggregation.Sum).Sum()
|
||||
require.Equal(t, int64(3), sum.AsInt64())
|
||||
require.Nil(t, err)
|
||||
|
||||
@ -192,7 +192,7 @@ func TestPushTicker(t *testing.T) {
|
||||
require.Equal(t, "counter", records[0].Descriptor().Name())
|
||||
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
|
||||
|
||||
sum, err = records[0].Aggregator().(aggregator.Sum).Sum()
|
||||
sum, err = records[0].Aggregator().(aggregation.Sum).Sum()
|
||||
require.Equal(t, int64(7), sum.AsInt64())
|
||||
require.Nil(t, err)
|
||||
|
||||
@ -216,7 +216,7 @@ func TestPushExportError(t *testing.T) {
|
||||
expectedError error
|
||||
}{
|
||||
{"errNone", nil, []string{"counter1{R=V,X=Y}", "counter2{R=V,}"}, nil},
|
||||
{"errNoData", aggregator.ErrNoData, []string{"counter2{R=V,}"}, nil},
|
||||
{"errNoData", aggregation.ErrNoData, []string{"counter2{R=V,}"}, nil},
|
||||
{"errUnexpected", errAggregator, []string{}, errAggregator},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
metricsdk "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
@ -131,7 +131,7 @@ func TestInputRangeCounter(t *testing.T) {
|
||||
counter := Must(meter).NewInt64Counter("name.counter")
|
||||
|
||||
counter.Add(ctx, -1)
|
||||
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
|
||||
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
|
||||
|
||||
checkpointed := sdk.Collect(ctx)
|
||||
require.Equal(t, 0, checkpointed)
|
||||
@ -139,7 +139,7 @@ func TestInputRangeCounter(t *testing.T) {
|
||||
integrator.records = nil
|
||||
counter.Add(ctx, 1)
|
||||
checkpointed = sdk.Collect(ctx)
|
||||
sum, err := integrator.records[0].Aggregator().(aggregator.Sum).Sum()
|
||||
sum, err := integrator.records[0].Aggregator().(aggregation.Sum).Sum()
|
||||
require.Equal(t, int64(1), sum.AsInt64())
|
||||
require.Equal(t, 1, checkpointed)
|
||||
require.Nil(t, err)
|
||||
@ -158,7 +158,7 @@ func TestInputRangeUpDownCounter(t *testing.T) {
|
||||
counter.Add(ctx, 1)
|
||||
|
||||
checkpointed := sdk.Collect(ctx)
|
||||
sum, err := integrator.records[0].Aggregator().(aggregator.Sum).Sum()
|
||||
sum, err := integrator.records[0].Aggregator().(aggregation.Sum).Sum()
|
||||
require.Equal(t, int64(1), sum.AsInt64())
|
||||
require.Equal(t, 1, checkpointed)
|
||||
require.Nil(t, err)
|
||||
@ -172,7 +172,7 @@ func TestInputRangeValueRecorder(t *testing.T) {
|
||||
valuerecorder := Must(meter).NewFloat64ValueRecorder("name.valuerecorder")
|
||||
|
||||
valuerecorder.Record(ctx, math.NaN())
|
||||
require.Equal(t, aggregator.ErrNaNInput, testHandler.Flush())
|
||||
require.Equal(t, aggregation.ErrNaNInput, testHandler.Flush())
|
||||
|
||||
checkpointed := sdk.Collect(ctx)
|
||||
require.Equal(t, 0, checkpointed)
|
||||
@ -183,7 +183,7 @@ func TestInputRangeValueRecorder(t *testing.T) {
|
||||
integrator.records = nil
|
||||
checkpointed = sdk.Collect(ctx)
|
||||
|
||||
count, err := integrator.records[0].Aggregator().(aggregator.Distribution).Count()
|
||||
count, err := integrator.records[0].Aggregator().(aggregation.Distribution).Count()
|
||||
require.Equal(t, int64(2), count)
|
||||
require.Equal(t, 1, checkpointed)
|
||||
require.Nil(t, testHandler.Flush())
|
||||
@ -269,7 +269,7 @@ func TestSDKLabelsDeduplication(t *testing.T) {
|
||||
|
||||
var actual [][]kv.KeyValue
|
||||
for _, rec := range integrator.records {
|
||||
sum, _ := rec.Aggregator().(aggregator.Sum).Sum()
|
||||
sum, _ := rec.Aggregator().(aggregation.Sum).Sum()
|
||||
require.Equal(t, sum, metric.NewInt64Number(2))
|
||||
|
||||
kvs := rec.Labels().ToSlice()
|
||||
@ -392,15 +392,15 @@ func TestSumObserverInputRange(t *testing.T) {
|
||||
|
||||
_ = Must(meter).NewFloat64SumObserver("float.sumobserver", func(_ context.Context, result metric.Float64ObserverResult) {
|
||||
result.Observe(-2, kv.String("A", "B"))
|
||||
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
|
||||
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
|
||||
result.Observe(-1, kv.String("C", "D"))
|
||||
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
|
||||
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
|
||||
})
|
||||
_ = Must(meter).NewInt64SumObserver("int.sumobserver", func(_ context.Context, result metric.Int64ObserverResult) {
|
||||
result.Observe(-1, kv.String("A", "B"))
|
||||
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
|
||||
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
|
||||
result.Observe(-1)
|
||||
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
|
||||
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
|
||||
})
|
||||
|
||||
collected := sdk.Collect(ctx)
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
@ -119,7 +119,7 @@ func (b *batch) ForEach(f func(export.Record) error) error {
|
||||
value.labels,
|
||||
value.resource,
|
||||
value.aggregator,
|
||||
)); err != nil && !errors.Is(err, aggregator.ErrNoData) {
|
||||
)); err != nil && !errors.Is(err, aggregation.ErrNoData) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
@ -162,10 +162,10 @@ func (o Output) AddTo(rec export.Record) error {
|
||||
key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded)
|
||||
var value float64
|
||||
|
||||
if s, ok := rec.Aggregator().(aggregator.Sum); ok {
|
||||
if s, ok := rec.Aggregator().(aggregation.Sum); ok {
|
||||
sum, _ := s.Sum()
|
||||
value = sum.CoerceToFloat64(rec.Descriptor().NumberKind())
|
||||
} else if l, ok := rec.Aggregator().(aggregator.LastValue); ok {
|
||||
} else if l, ok := rec.Aggregator().(aggregation.LastValue); ok {
|
||||
last, _, _ := l.LastValue()
|
||||
value = last.CoerceToFloat64(rec.Descriptor().NumberKind())
|
||||
} else {
|
||||
|
@ -28,7 +28,7 @@ import (
|
||||
api "go.opentelemetry.io/otel/api/metric"
|
||||
internal "go.opentelemetry.io/otel/internal/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
|
@ -35,7 +35,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
api "go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
)
|
||||
@ -280,14 +280,14 @@ func (f *testFixture) Process(record export.Record) error {
|
||||
agg := record.Aggregator()
|
||||
switch record.Descriptor().MetricKind() {
|
||||
case metric.CounterKind:
|
||||
sum, err := agg.(aggregator.Sum).Sum()
|
||||
sum, err := agg.(aggregation.Sum).Sum()
|
||||
if err != nil {
|
||||
f.T.Fatal("Sum error: ", err)
|
||||
}
|
||||
f.impl.storeCollect(actual, sum, time.Time{})
|
||||
case metric.ValueRecorderKind:
|
||||
lv, ts, err := agg.(aggregator.LastValue).LastValue()
|
||||
if err != nil && err != aggregator.ErrNoData {
|
||||
lv, ts, err := agg.(aggregation.LastValue).LastValue()
|
||||
if err != nil && err != aggregation.ErrNoData {
|
||||
f.T.Fatal("Last value error: ", err)
|
||||
}
|
||||
f.impl.storeCollect(actual, lv, ts)
|
||||
|
@ -342,12 +342,13 @@ func startSpanInternal(tr *tracer, name string, parent apitrace.SpanContext, rem
|
||||
startTime = time.Now()
|
||||
}
|
||||
span.data = &export.SpanData{
|
||||
SpanContext: span.spanContext,
|
||||
StartTime: startTime,
|
||||
SpanKind: apitrace.ValidateSpanKind(o.SpanKind),
|
||||
Name: name,
|
||||
HasRemoteParent: remoteParent,
|
||||
Resource: cfg.Resource,
|
||||
SpanContext: span.spanContext,
|
||||
StartTime: startTime,
|
||||
SpanKind: apitrace.ValidateSpanKind(o.SpanKind),
|
||||
Name: name,
|
||||
HasRemoteParent: remoteParent,
|
||||
Resource: cfg.Resource,
|
||||
InstrumentationLibrary: tr.instrumentationLibrary,
|
||||
}
|
||||
span.attributes = newAttributesMap(cfg.MaxAttributesPerSpan)
|
||||
span.messageEvents = newEvictedQueue(cfg.MaxEventsPerSpan)
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
ottest "go.opentelemetry.io/otel/internal/testing"
|
||||
export "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
@ -301,8 +302,9 @@ func TestSetSpanAttributesOnStart(t *testing.T) {
|
||||
kv.String("key1", "value1"),
|
||||
kv.String("key2", "value2"),
|
||||
},
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "StartSpanAttribute"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("SetSpanAttributesOnStart: -got +want %s", diff)
|
||||
@ -329,8 +331,9 @@ func TestSetSpanAttributes(t *testing.T) {
|
||||
Attributes: []kv.KeyValue{
|
||||
kv.String("key1", "value1"),
|
||||
},
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "SpanAttribute"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("SetSpanAttributes: -got +want %s", diff)
|
||||
@ -365,9 +368,10 @@ func TestSetSpanAttributesOverLimit(t *testing.T) {
|
||||
kv.Bool("key1", false),
|
||||
kv.Int64("key4", 4),
|
||||
},
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
DroppedAttributeCount: 1,
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
DroppedAttributeCount: 1,
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "SpanAttributesOverLimit"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("SetSpanAttributesOverLimit: -got +want %s", diff)
|
||||
@ -411,7 +415,8 @@ func TestEvents(t *testing.T) {
|
||||
{Name: "foo", Attributes: []kv.KeyValue{k1v1}},
|
||||
{Name: "bar", Attributes: []kv.KeyValue{k2v2, k3v3}},
|
||||
},
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "Events"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("Message Events: -got +want %s", diff)
|
||||
@ -463,6 +468,7 @@ func TestEventsOverLimit(t *testing.T) {
|
||||
DroppedMessageEventCount: 2,
|
||||
HasRemoteParent: true,
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "EventsOverLimit"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("Message Event over limit: -got +want %s", diff)
|
||||
@ -505,7 +511,8 @@ func TestLinks(t *testing.T) {
|
||||
{SpanContext: sc1, Attributes: []kv.KeyValue{k1v1}},
|
||||
{SpanContext: sc2, Attributes: []kv.KeyValue{k2v2, k3v3}},
|
||||
},
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "Links"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("Link: -got +want %s", diff)
|
||||
@ -547,9 +554,10 @@ func TestLinksOverLimit(t *testing.T) {
|
||||
{SpanContext: sc2, Attributes: []kv.KeyValue{k2v2}},
|
||||
{SpanContext: sc3, Attributes: []kv.KeyValue{k3v3}},
|
||||
},
|
||||
DroppedLinkCount: 1,
|
||||
HasRemoteParent: true,
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
DroppedLinkCount: 1,
|
||||
HasRemoteParent: true,
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "LinksOverLimit"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("Link over limit: -got +want %s", diff)
|
||||
@ -594,12 +602,13 @@ func TestSetSpanStatus(t *testing.T) {
|
||||
TraceID: tid,
|
||||
TraceFlags: 0x1,
|
||||
},
|
||||
ParentSpanID: sid,
|
||||
Name: "span0",
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
StatusCode: codes.Canceled,
|
||||
StatusMessage: "canceled",
|
||||
HasRemoteParent: true,
|
||||
ParentSpanID: sid,
|
||||
Name: "span0",
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
StatusCode: codes.Canceled,
|
||||
StatusMessage: "canceled",
|
||||
HasRemoteParent: true,
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "SpanStatus"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("SetSpanStatus: -got +want %s", diff)
|
||||
@ -951,6 +960,7 @@ func TestRecordError(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "RecordError"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("SpanErrorOptions: -got +want %s", diff)
|
||||
@ -997,6 +1007,7 @@ func TestRecordErrorWithStatus(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "RecordErrorWithStatus"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("SpanErrorOptions: -got +want %s", diff)
|
||||
@ -1020,12 +1031,13 @@ func TestRecordErrorNil(t *testing.T) {
|
||||
TraceID: tid,
|
||||
TraceFlags: 0x1,
|
||||
},
|
||||
ParentSpanID: sid,
|
||||
Name: "span0",
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
StatusCode: codes.OK,
|
||||
StatusMessage: "",
|
||||
ParentSpanID: sid,
|
||||
Name: "span0",
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
StatusCode: codes.OK,
|
||||
StatusMessage: "",
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "RecordErrorNil"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("SpanErrorOptions: -got +want %s", diff)
|
||||
@ -1092,9 +1104,44 @@ func TestWithResource(t *testing.T) {
|
||||
Attributes: []kv.KeyValue{
|
||||
kv.String("key1", "value1"),
|
||||
},
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
Resource: resource.New(kv.String("rk1", "rv1"), kv.Int64("rk2", 5)),
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
Resource: resource.New(kv.String("rk1", "rv1"), kv.Int64("rk2", 5)),
|
||||
InstrumentationLibrary: instrumentation.Library{Name: "WithResource"},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("WithResource:\n -got +want %s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithInstrumentationVersion(t *testing.T) {
|
||||
var te testExporter
|
||||
tp, _ := NewProvider(WithSyncer(&te))
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = apitrace.ContextWithRemoteSpanContext(ctx, remoteSpanContext())
|
||||
_, span := tp.Tracer(
|
||||
"WithInstrumentationVersion",
|
||||
apitrace.WithInstrumentationVersion("v0.1.0"),
|
||||
).Start(ctx, "span0", apitrace.WithRecord())
|
||||
got, err := endSpan(&te, span)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
|
||||
want := &export.SpanData{
|
||||
SpanContext: apitrace.SpanContext{
|
||||
TraceID: tid,
|
||||
TraceFlags: 0x1,
|
||||
},
|
||||
ParentSpanID: sid,
|
||||
Name: "span0",
|
||||
SpanKind: apitrace.SpanKindInternal,
|
||||
HasRemoteParent: true,
|
||||
InstrumentationLibrary: instrumentation.Library{
|
||||
Name: "WithInstrumentationVersion",
|
||||
Version: "v0.1.0",
|
||||
},
|
||||
}
|
||||
if diff := cmpDiff(got, want); diff != "" {
|
||||
t.Errorf("WithResource:\n -got +want %s", diff)
|
||||
|
Loading…
x
Reference in New Issue
Block a user