1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-07-17 01:12:45 +02:00

Add export timestamps; distinguish Accumulation vs. Record (#835)

* Introduce Accumulation

* Refactor export structs

* FTB exporters

* Test timestamps

* Test no-start case

* From feedback

* Apply suggestions from code review

(Thanks @MrAlias!)

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Comments in sdk/metric/integrator/test

* Fix build

* Comments and feedback

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Joshua MacDonald
2020-06-18 10:16:33 -07:00
committed by GitHub
parent c36fcd2dc4
commit 4e4271791f
25 changed files with 513 additions and 197 deletions

View File

@ -54,17 +54,10 @@ func newFixture(b *testing.B) *benchFixture {
return bf return bf
} }
func (*benchFixture) Process(export.Record) error { func (*benchFixture) Process(export.Accumulation) error {
return nil return nil
} }
func (*benchFixture) CheckpointSet() export.CheckpointSet {
return nil
}
func (*benchFixture) FinishedCollection() {
}
func (fix *benchFixture) Meter(_ string, _ ...metric.MeterOption) metric.Meter { func (fix *benchFixture) Meter(_ string, _ ...metric.MeterOption) metric.Meter {
return fix.meter return fix.meter
} }

View File

@ -205,10 +205,12 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
defer c.exp.lock.RUnlock() defer c.exp.lock.RUnlock()
ctrl := c.exp.Controller() ctrl := c.exp.Controller()
ctrl.Collect(context.Background()) if err := ctrl.Collect(context.Background()); err != nil {
global.Handle(err)
}
err := ctrl.ForEach(func(record export.Record) error { err := ctrl.ForEach(func(record export.Record) error {
agg := record.Aggregator() agg := record.Aggregation()
numberKind := record.Descriptor().NumberKind() numberKind := record.Descriptor().NumberKind()
var labelKeys, labels []string var labelKeys, labels []string

View File

@ -159,7 +159,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
} }
aggError = checkpointSet.ForEach(func(record export.Record) error { aggError = checkpointSet.ForEach(func(record export.Record) error {
desc := record.Descriptor() desc := record.Descriptor()
agg := record.Aggregator() agg := record.Aggregation()
kind := desc.NumberKind() kind := desc.NumberKind()
encodedResource := record.Resource().Encoded(e.config.LabelEncoder) encodedResource := record.Resource().Encoded(e.config.LabelEncoder)

View File

@ -19,6 +19,7 @@ import (
"errors" "errors"
"reflect" "reflect"
"sync" "sync"
"time"
"go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/label"
@ -47,20 +48,30 @@ type NoopAggregator struct{}
var _ export.Aggregator = (*NoopAggregator)(nil) var _ export.Aggregator = (*NoopAggregator)(nil)
// Update implements export.Aggregator. // Update implements export.Aggregator.
func (*NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor) error { func (NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor) error {
return nil return nil
} }
// SynchronizedCopy implements export.Aggregator. // SynchronizedCopy implements export.Aggregator.
func (*NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error { func (NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error {
return nil return nil
} }
// Merge implements export.Aggregator. // Merge implements export.Aggregator.
func (*NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error { func (NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error {
return nil return nil
} }
// Aggregation returns an interface for reading the state of this aggregator.
func (NoopAggregator) Aggregation() aggregation.Aggregation {
return NoopAggregator{}
}
// Kind implements aggregation.Aggregation.
func (NoopAggregator) Kind() aggregation.Kind {
return aggregation.Kind("Noop")
}
// NewCheckpointSet returns a test CheckpointSet that new records could be added. // NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their encoded labels. // Records are grouped by their encoded labels.
func NewCheckpointSet(resource *resource.Resource) *CheckpointSet { func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
@ -88,10 +99,10 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
distinct: elabels.Equivalent(), distinct: elabels.Equivalent(),
} }
if record, ok := p.records[key]; ok { if record, ok := p.records[key]; ok {
return record.Aggregator(), false return record.Aggregation().(export.Aggregator), false
} }
rec := export.NewRecord(desc, &elabels, p.resource, newAgg) rec := export.NewRecord(desc, &elabels, p.resource, newAgg.Aggregation(), time.Time{}, time.Time{})
p.updates = append(p.updates, rec) p.updates = append(p.updates, rec)
p.records[key] = rec p.records[key] = rec
return newAgg, true return newAgg, true

View File

@ -234,20 +234,20 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e
// Record transforms a Record into an OTLP Metric. An ErrUnimplementedAgg // Record transforms a Record into an OTLP Metric. An ErrUnimplementedAgg
// error is returned if the Record Aggregator is not supported. // error is returned if the Record Aggregator is not supported.
func Record(r export.Record) (*metricpb.Metric, error) { func Record(r export.Record) (*metricpb.Metric, error) {
d := r.Descriptor() switch a := r.Aggregation().(type) {
l := r.Labels()
switch a := r.Aggregator().(type) {
case aggregation.MinMaxSumCount: case aggregation.MinMaxSumCount:
return minMaxSumCount(d, l, a) return minMaxSumCount(r, a)
case aggregation.Sum: case aggregation.Sum:
return sum(d, l, a) return sum(r, a)
default: default:
return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a) return nil, fmt.Errorf("%w: %v", ErrUnimplementedAgg, a)
} }
} }
// sum transforms a Sum Aggregator into an OTLP Metric. // sum transforms a Sum Aggregator into an OTLP Metric.
func sum(desc *metric.Descriptor, labels *label.Set, a aggregation.Sum) (*metricpb.Metric, error) { func sum(record export.Record, a aggregation.Sum) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
sum, err := a.Sum() sum, err := a.Sum()
if err != nil { if err != nil {
return nil, err return nil, err
@ -266,12 +266,20 @@ func sum(desc *metric.Descriptor, labels *label.Set, a aggregation.Sum) (*metric
case metric.Int64NumberKind, metric.Uint64NumberKind: case metric.Int64NumberKind, metric.Uint64NumberKind:
m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_INT64 m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_INT64
m.Int64DataPoints = []*metricpb.Int64DataPoint{ m.Int64DataPoints = []*metricpb.Int64DataPoint{
{Value: sum.CoerceToInt64(n)}, {
Value: sum.CoerceToInt64(n),
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
},
} }
case metric.Float64NumberKind: case metric.Float64NumberKind:
m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_DOUBLE m.MetricDescriptor.Type = metricpb.MetricDescriptor_COUNTER_DOUBLE
m.DoubleDataPoints = []*metricpb.DoubleDataPoint{ m.DoubleDataPoints = []*metricpb.DoubleDataPoint{
{Value: sum.CoerceToFloat64(n)}, {
Value: sum.CoerceToFloat64(n),
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
},
} }
default: default:
return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n) return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n)
@ -299,7 +307,9 @@ func minMaxSumCountValues(a aggregation.MinMaxSumCount) (min, max, sum metric.Nu
} }
// minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric. // minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric.
func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregation.MinMaxSumCount) (*metricpb.Metric, error) { func minMaxSumCount(record export.Record, a aggregation.MinMaxSumCount) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
min, max, sum, count, err := minMaxSumCountValues(a) min, max, sum, count, err := minMaxSumCountValues(a)
if err != nil { if err != nil {
return nil, err return nil, err
@ -328,6 +338,8 @@ func minMaxSumCount(desc *metric.Descriptor, labels *label.Set, a aggregation.Mi
Value: max.CoerceToFloat64(numKind), Value: max.CoerceToFloat64(numKind),
}, },
}, },
StartTimeUnixNano: uint64(record.StartTime().UnixNano()),
TimeUnixNano: uint64(record.EndTime().UnixNano()),
}, },
}, },
}, nil }, nil

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"errors" "errors"
"testing" "testing"
"time"
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1" commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1" metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
@ -29,11 +30,19 @@ import (
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/unit" "go.opentelemetry.io/otel/api/unit"
"go.opentelemetry.io/otel/exporters/metric/test" "go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
) )
var (
// Timestamps used in this test:
intervalStart = time.Now()
intervalEnd = intervalStart.Add(time.Hour)
)
func TestStringKeyValues(t *testing.T) { func TestStringKeyValues(t *testing.T) {
tests := []struct { tests := []struct {
kvs []kv.KeyValue kvs []kv.KeyValue
@ -155,7 +164,8 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
metric.WithDescription(test.description), metric.WithDescription(test.description),
metric.WithUnit(test.unit)) metric.WithUnit(test.unit))
labels := label.NewSet(test.labels...) labels := label.NewSet(test.labels...)
got, err := minMaxSumCount(&desc, &labels, ckpt.(aggregation.MinMaxSumCount)) record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
got, err := minMaxSumCount(record, ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) { if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor) assert.Equal(t, test.expected, got.MetricDescriptor)
} }
@ -184,9 +194,12 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
Value: 10, Value: 10,
}, },
}, },
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}, },
} }
m, err := minMaxSumCount(&desc, &labels, ckpt.(aggregation.MinMaxSumCount)) record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
m, err := minMaxSumCount(record, ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) { if assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints) assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints) assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
@ -253,7 +266,9 @@ func TestSumMetricDescriptor(t *testing.T) {
metric.WithUnit(test.unit), metric.WithUnit(test.unit),
) )
labels := label.NewSet(test.labels...) labels := label.NewSet(test.labels...)
got, err := sum(&desc, &labels, &sumAgg.New(1)[0]) emptyAgg := &sumAgg.New(1)[0]
record := export.NewRecord(&desc, &labels, nil, emptyAgg, intervalStart, intervalEnd)
got, err := sum(record, emptyAgg)
if assert.NoError(t, err) { if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor) assert.Equal(t, test.expected, got.MetricDescriptor)
} }
@ -266,8 +281,13 @@ func TestSumInt64DataPoints(t *testing.T) {
s, ckpt := test.Unslice2(sumAgg.New(2)) s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc)) assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
require.NoError(t, s.SynchronizedCopy(ckpt, &desc)) require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
if m, err := sum(&desc, &labels, ckpt.(aggregation.Sum)); assert.NoError(t, err) { record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64DataPoints) if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}, m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints) assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints) assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDataPoints) assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDataPoints)
@ -280,9 +300,14 @@ func TestSumFloat64DataPoints(t *testing.T) {
s, ckpt := test.Unslice2(sumAgg.New(2)) s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc)) assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
require.NoError(t, s.SynchronizedCopy(ckpt, &desc)) require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
if m, err := sum(&desc, &labels, ckpt.(aggregation.Sum)); assert.NoError(t, err) { record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd)
if m, err := sum(record, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints) assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDataPoints) assert.Equal(t, []*metricpb.DoubleDataPoint{{
Value: 1,
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
TimeUnixNano: uint64(intervalEnd.UnixNano()),
}}, m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints) assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDataPoints) assert.Equal(t, []*metricpb.SummaryDataPoint(nil), m.SummaryDataPoints)
} }
@ -292,7 +317,8 @@ func TestSumErrUnknownValueType(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.NumberKind(-1)) desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.NumberKind(-1))
labels := label.NewSet() labels := label.NewSet()
s := &sumAgg.New(1)[0] s := &sumAgg.New(1)[0]
_, err := sum(&desc, &labels, s) record := export.NewRecord(&desc, &labels, nil, s, intervalStart, intervalEnd)
_, err := sum(record, s)
assert.Error(t, err) assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) { if !errors.Is(err, ErrUnknownValueType) {
t.Errorf("expected ErrUnknownValueType, got %v", err) t.Errorf("expected ErrUnknownValueType, got %v", err)

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"sync" "sync"
"testing" "testing"
"time"
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1" colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1" commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
@ -39,6 +40,21 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var (
// Timestamps used in this test:
intervalStart = time.Now()
intervalEnd = intervalStart.Add(time.Hour)
)
func startTime() uint64 {
return uint64(intervalStart.UnixNano())
}
func pointTime() uint64 {
return uint64(intervalEnd.UnixNano())
}
type metricsServiceClientStub struct { type metricsServiceClientStub struct {
rm []metricpb.ResourceMetrics rm []metricpb.ResourceMetrics
} }
@ -171,7 +187,9 @@ func TestNoGroupingExport(t *testing.T) {
MetricDescriptor: cpu1MD, MetricDescriptor: cpu1MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -179,7 +197,9 @@ func TestNoGroupingExport(t *testing.T) {
MetricDescriptor: cpu2MD, MetricDescriptor: cpu2MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -235,6 +255,8 @@ func TestValuerecorderMetricGroupingExport(t *testing.T) {
Value: 10.0, Value: 10.0,
}, },
}, },
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
{ {
Count: 2, Count: 2,
@ -249,6 +271,8 @@ func TestValuerecorderMetricGroupingExport(t *testing.T) {
Value: 10.0, Value: 10.0,
}, },
}, },
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -287,10 +311,14 @@ func TestCountInt64MetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu1MD, MetricDescriptor: cpu1MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -337,10 +365,14 @@ func TestCountUint64MetricGroupingExport(t *testing.T) {
}, },
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -387,10 +419,14 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) {
}, },
DoubleDataPoints: []*metricpb.DoubleDataPoint{ DoubleDataPoints: []*metricpb.DoubleDataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -449,10 +485,14 @@ func TestResourceMetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu1MD, MetricDescriptor: cpu1MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -460,7 +500,9 @@ func TestResourceMetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu2MD, MetricDescriptor: cpu2MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -477,7 +519,9 @@ func TestResourceMetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu1MD, MetricDescriptor: cpu1MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -567,10 +611,14 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu1MD, MetricDescriptor: cpu1MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -578,7 +626,9 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu2MD, MetricDescriptor: cpu2MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -594,7 +644,9 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu1MD, MetricDescriptor: cpu1MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -609,7 +661,9 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu1MD, MetricDescriptor: cpu1MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -630,7 +684,9 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
MetricDescriptor: cpu1MD, MetricDescriptor: cpu1MD,
Int64DataPoints: []*metricpb.Int64DataPoint{ Int64DataPoints: []*metricpb.Int64DataPoint{
{ {
Value: 11, Value: 11,
StartTimeUnixNano: startTime(),
TimeUnixNano: pointTime(),
}, },
}, },
}, },
@ -689,7 +745,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
equiv := r.resource.Equivalent() equiv := r.resource.Equivalent()
resources[equiv] = r.resource resources[equiv] = r.resource
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, ckpt)) recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, ckpt.Aggregation(), intervalStart, intervalEnd))
} }
for _, records := range recs { for _, records := range recs {
assert.NoError(t, exp.Export(context.Background(), &checkpointSet{records: records})) assert.NoError(t, exp.Export(context.Background(), &checkpointSet{records: records}))

View File

@ -17,9 +17,11 @@ package metric // import "go.opentelemetry.io/otel/sdk/export/metric"
import ( import (
"context" "context"
"sync" "sync"
"time"
"go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
@ -61,13 +63,13 @@ type Integrator interface {
AggregationSelector AggregationSelector
// Process is called by the SDK once per internal record, // Process is called by the SDK once per internal record,
// passing the export Record (a Descriptor, the corresponding // passing the export Accumulation (a Descriptor, the corresponding
// Labels, and the checkpointed Aggregator). This call has no // Labels, and the checkpointed Aggregator). This call has no
// Context argument because it is expected to perform only // Context argument because it is expected to perform only
// computation. An SDK is not expected to call exporters from // computation. An SDK is not expected to call exporters from
// with Process, use a controller for that (see // with Process, use a controller for that (see
// ./controllers/{pull,push}. // ./controllers/{pull,push}.
Process(record Record) error Process(Accumulation) error
} }
// AggregationSelector supports selecting the kind of Aggregator to // AggregationSelector supports selecting the kind of Aggregator to
@ -105,6 +107,13 @@ type AggregationSelector interface {
// to attach a Sum aggregator to a ValueRecorder instrument or a // to attach a Sum aggregator to a ValueRecorder instrument or a
// MinMaxSumCount aggregator to a Counter instrument. // MinMaxSumCount aggregator to a Counter instrument.
type Aggregator interface { type Aggregator interface {
// Aggregation returns an Aggregation interface to access the
// current state of this Aggregator. The caller is
// responsible for synchronization and must not call any the
// other methods in this interface concurrently while using
// the Aggregation.
Aggregation() aggregation.Aggregation
// Update receives a new measured value and incorporates it // Update receives a new measured value and incorporates it
// into the aggregation. Update() calls may be called // into the aggregation. Update() calls may be called
// concurrently. // concurrently.
@ -189,45 +198,97 @@ type CheckpointSet interface {
RUnlock() RUnlock()
} }
// Record contains the exported data for a single metric instrument // Metadata contains the common elements for exported metric data that
// and label set. // are shared by the Accumulator->Integrator and Integrator->Exporter
type Record struct { // steps.
type Metadata struct {
descriptor *metric.Descriptor descriptor *metric.Descriptor
labels *label.Set labels *label.Set
resource *resource.Resource resource *resource.Resource
}
// Accumulation contains the exported data for a single metric instrument
// and label set, as prepared by an Accumulator for the Integrator.
type Accumulation struct {
Metadata
aggregator Aggregator aggregator Aggregator
} }
// NewRecord allows Integrator implementations to construct export // Record contains the exported data for a single metric instrument
// records. The Descriptor, Labels, and Aggregator represent // and label set, as prepared by the Integrator for the Exporter.
// aggregate metric events received over a single collection period. // This includes the effective start and end time for the aggregation.
func NewRecord(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator Aggregator) Record { type Record struct {
return Record{ Metadata
descriptor: descriptor, aggregation aggregation.Aggregation
labels: labels, start time.Time
resource: resource, end time.Time
}
// Descriptor describes the metric instrument being exported.
func (m Metadata) Descriptor() *metric.Descriptor {
return m.descriptor
}
// Labels describes the labels associated with the instrument and the
// aggregated data.
func (m Metadata) Labels() *label.Set {
return m.labels
}
// Resource contains common attributes that apply to this metric event.
func (m Metadata) Resource() *resource.Resource {
return m.resource
}
// NewAccumulation allows Accumulator implementations to construct new
// Accumulations to send to Integrators. The Descriptor, Labels, Resource,
// and Aggregator represent aggregate metric events received over a single
// collection period.
func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator Aggregator) Accumulation {
return Accumulation{
Metadata: Metadata{
descriptor: descriptor,
labels: labels,
resource: resource,
},
aggregator: aggregator, aggregator: aggregator,
} }
} }
// Aggregator returns the checkpointed aggregator. It is safe to // Aggregator returns the checkpointed aggregator. It is safe to
// access the checkpointed state without locking. // access the checkpointed state without locking.
func (r Record) Aggregator() Aggregator { func (r Accumulation) Aggregator() Aggregator {
return r.aggregator return r.aggregator
} }
// Descriptor describes the metric instrument being exported. // NewRecord allows Integrator implementations to construct export
func (r Record) Descriptor() *metric.Descriptor { // records. The Descriptor, Labels, and Aggregator represent
return r.descriptor // aggregate metric events received over a single collection period.
func NewRecord(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregation aggregation.Aggregation, start, end time.Time) Record {
return Record{
Metadata: Metadata{
descriptor: descriptor,
labels: labels,
resource: resource,
},
aggregation: aggregation,
start: start,
end: end,
}
} }
// Labels describes the labels associated with the instrument and the // Aggregation returns the aggregation, an interface to the record and
// aggregated data. // its aggregator, dependent on the kind of both the input and exporter.
func (r Record) Labels() *label.Set { func (r Record) Aggregation() aggregation.Aggregation {
return r.labels return r.aggregation
} }
// Resource contains common attributes that apply to this metric event. // StartTime is the start time of the interval covered by this aggregation.
func (r Record) Resource() *resource.Resource { func (r Record) StartTime() time.Time {
return r.resource return r.start
}
// EndTime is the end time of the interval covered by this aggregation.
func (r Record) EndTime() time.Time {
return r.end
} }

View File

@ -51,6 +51,11 @@ func New(cnt int) []Aggregator {
return make([]Aggregator, cnt) return make([]Aggregator, cnt)
} }
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.ExactKind. // Kind returns aggregation.ExactKind.
func (c *Aggregator) Kind() aggregation.Kind { func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.ExactKind return aggregation.ExactKind

View File

@ -54,6 +54,11 @@ func New(cnt int, desc *metric.Descriptor, cfg *Config) []Aggregator {
return aggs return aggs
} }
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.SketchKind. // Kind returns aggregation.SketchKind.
func (c *Aggregator) Kind() aggregation.Kind { func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.SketchKind return aggregation.SketchKind

View File

@ -83,6 +83,11 @@ func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator {
return aggs return aggs
} }
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.HistogramKind. // Kind returns aggregation.HistogramKind.
func (c *Aggregator) Kind() aggregation.Kind { func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.HistogramKind return aggregation.HistogramKind

View File

@ -68,6 +68,11 @@ func New(cnt int) []Aggregator {
return aggs return aggs
} }
// Aggregation returns an interface for reading the state of this aggregator.
func (g *Aggregator) Aggregation() aggregation.Aggregation {
return g
}
// Kind returns aggregation.LastValueKind. // Kind returns aggregation.LastValueKind.
func (g *Aggregator) Kind() aggregation.Kind { func (g *Aggregator) Kind() aggregation.Kind {
return aggregation.LastValueKind return aggregation.LastValueKind

View File

@ -61,6 +61,11 @@ func New(cnt int, desc *metric.Descriptor) []Aggregator {
return aggs return aggs
} }
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.MinMaxSumCountKind. // Kind returns aggregation.MinMaxSumCountKind.
func (c *Aggregator) Kind() aggregation.Kind { func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.MinMaxSumCountKind return aggregation.MinMaxSumCountKind

View File

@ -40,6 +40,11 @@ func New(cnt int) []Aggregator {
return make([]Aggregator, cnt) return make([]Aggregator, cnt)
} }
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.SumKind. // Kind returns aggregation.SumKind.
func (c *Aggregator) Kind() aggregation.Kind { func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.SumKind return aggregation.SumKind

View File

@ -47,17 +47,10 @@ func newFixture(b *testing.B) *benchFixture {
return bf return bf
} }
func (f *benchFixture) Process(rec export.Record) error { func (f *benchFixture) Process(export.Accumulation) error {
return nil return nil
} }
func (*benchFixture) CheckpointSet() export.CheckpointSet {
return nil
}
func (*benchFixture) FinishedCollection() {
}
func makeManyLabels(n int) [][]kv.KeyValue { func makeManyLabels(n int) [][]kv.KeyValue {
r := make([][]kv.KeyValue, n) r := make([][]kv.KeyValue, n)

View File

@ -92,7 +92,7 @@ func (c *Controller) ForEach(f func(export.Record) error) error {
// Collect requests a collection. The collection will be skipped if // Collect requests a collection. The collection will be skipped if
// the last collection is aged less than the CachePeriod. // the last collection is aged less than the CachePeriod.
func (c *Controller) Collect(ctx context.Context) { func (c *Controller) Collect(ctx context.Context) error {
c.integrator.Lock() c.integrator.Lock()
defer c.integrator.Unlock() defer c.integrator.Unlock()
@ -101,11 +101,14 @@ func (c *Controller) Collect(ctx context.Context) {
elapsed := now.Sub(c.lastCollect) elapsed := now.Sub(c.lastCollect)
if elapsed < c.period { if elapsed < c.period {
return return nil
} }
c.lastCollect = now c.lastCollect = now
} }
c.integrator.StartCollection()
c.accumulator.Collect(ctx) c.accumulator.Collect(ctx)
err := c.integrator.FinishCollection()
c.checkpoint = c.integrator.CheckpointSet() c.checkpoint = c.integrator.CheckpointSet()
return err
} }

View File

@ -44,9 +44,9 @@ func TestPullNoCache(t *testing.T) {
counter.Add(ctx, 10, kv.String("A", "B")) counter.Add(ctx, 10, kv.String("A", "B"))
puller.Collect(ctx) require.NoError(t, puller.Collect(ctx))
records := test.NewOutput(label.DefaultEncoder()) records := test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo) require.NoError(t, puller.ForEach(records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 10, "counter/A=B/": 10,
@ -54,9 +54,9 @@ func TestPullNoCache(t *testing.T) {
counter.Add(ctx, 10, kv.String("A", "B")) counter.Add(ctx, 10, kv.String("A", "B"))
puller.Collect(ctx) require.NoError(t, puller.Collect(ctx))
records = test.NewOutput(label.DefaultEncoder()) records = test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo) require.NoError(t, puller.ForEach(records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 20, "counter/A=B/": 20,
@ -78,9 +78,9 @@ func TestPullWithCache(t *testing.T) {
counter.Add(ctx, 10, kv.String("A", "B")) counter.Add(ctx, 10, kv.String("A", "B"))
puller.Collect(ctx) require.NoError(t, puller.Collect(ctx))
records := test.NewOutput(label.DefaultEncoder()) records := test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo) require.NoError(t, puller.ForEach(records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 10, "counter/A=B/": 10,
@ -89,9 +89,9 @@ func TestPullWithCache(t *testing.T) {
counter.Add(ctx, 10, kv.String("A", "B")) counter.Add(ctx, 10, kv.String("A", "B"))
// Cached value! // Cached value!
puller.Collect(ctx) require.NoError(t, puller.Collect(ctx))
records = test.NewOutput(label.DefaultEncoder()) records = test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo) require.NoError(t, puller.ForEach(records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 10, "counter/A=B/": 10,
@ -101,9 +101,9 @@ func TestPullWithCache(t *testing.T) {
runtime.Gosched() runtime.Gosched()
// Re-computed value! // Re-computed value!
puller.Collect(ctx) require.NoError(t, puller.Collect(ctx))
records = test.NewOutput(label.DefaultEncoder()) records = test.NewOutput(label.DefaultEncoder())
_ = puller.ForEach(records.AddTo) require.NoError(t, puller.ForEach(records.AddRecord))
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter/A=B/": 20, "counter/A=B/": 20,

View File

@ -142,12 +142,13 @@ func (c *Controller) tick() {
c.integrator.Lock() c.integrator.Lock()
defer c.integrator.Unlock() defer c.integrator.Unlock()
c.integrator.StartCollection()
c.accumulator.Collect(ctx) c.accumulator.Collect(ctx)
if err := c.integrator.FinishCollection(); err != nil {
global.Handle(err)
}
err := c.exporter.Export(ctx, c.integrator.CheckpointSet()) if err := c.exporter.Export(ctx, c.integrator.CheckpointSet()); err != nil {
c.integrator.FinishedCollection()
if err != nil {
global.Handle(err) global.Handle(err)
} }
} }

View File

@ -170,7 +170,7 @@ func TestPushTicker(t *testing.T) {
require.Equal(t, "counter.sum", records[0].Descriptor().Name()) require.Equal(t, "counter.sum", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder())) require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
sum, err := records[0].Aggregator().(aggregation.Sum).Sum() sum, err := records[0].Aggregation().(aggregation.Sum).Sum()
require.Equal(t, int64(3), sum.AsInt64()) require.Equal(t, int64(3), sum.AsInt64())
require.Nil(t, err) require.Nil(t, err)
@ -187,7 +187,7 @@ func TestPushTicker(t *testing.T) {
require.Equal(t, "counter.sum", records[0].Descriptor().Name()) require.Equal(t, "counter.sum", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder())) require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
sum, err = records[0].Aggregator().(aggregation.Sum).Sum() sum, err = records[0].Aggregation().(aggregation.Sum).Sum()
require.Equal(t, int64(7), sum.AsInt64()) require.Equal(t, int64(7), sum.AsInt64())
require.Nil(t, err) require.Nil(t, err)

View File

@ -74,7 +74,7 @@ type correctnessIntegrator struct {
t *testing.T t *testing.T
*testSelector *testSelector
records []export.Record accumulations []export.Accumulation
} }
type testSelector struct { type testSelector struct {
@ -101,16 +101,8 @@ func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessInt
return meter, accum, integrator return meter, accum, integrator
} }
func (ci *correctnessIntegrator) CheckpointSet() export.CheckpointSet { func (ci *correctnessIntegrator) Process(accumulation export.Accumulation) error {
ci.t.Fatal("Should not be called") ci.accumulations = append(ci.accumulations, accumulation)
return nil
}
func (*correctnessIntegrator) FinishedCollection() {
}
func (ci *correctnessIntegrator) Process(record export.Record) error {
ci.records = append(ci.records, record)
return nil return nil
} }
@ -126,10 +118,10 @@ func TestInputRangeCounter(t *testing.T) {
checkpointed := sdk.Collect(ctx) checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed) require.Equal(t, 0, checkpointed)
integrator.records = nil integrator.accumulations = nil
counter.Add(ctx, 1) counter.Add(ctx, 1)
checkpointed = sdk.Collect(ctx) checkpointed = sdk.Collect(ctx)
sum, err := integrator.records[0].Aggregator().(aggregation.Sum).Sum() sum, err := integrator.accumulations[0].Aggregator().(aggregation.Sum).Sum()
require.Equal(t, int64(1), sum.AsInt64()) require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed) require.Equal(t, 1, checkpointed)
require.Nil(t, err) require.Nil(t, err)
@ -148,7 +140,7 @@ func TestInputRangeUpDownCounter(t *testing.T) {
counter.Add(ctx, 1) counter.Add(ctx, 1)
checkpointed := sdk.Collect(ctx) checkpointed := sdk.Collect(ctx)
sum, err := integrator.records[0].Aggregator().(aggregation.Sum).Sum() sum, err := integrator.accumulations[0].Aggregator().(aggregation.Sum).Sum()
require.Equal(t, int64(1), sum.AsInt64()) require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed) require.Equal(t, 1, checkpointed)
require.Nil(t, err) require.Nil(t, err)
@ -170,10 +162,10 @@ func TestInputRangeValueRecorder(t *testing.T) {
valuerecorder.Record(ctx, 1) valuerecorder.Record(ctx, 1)
valuerecorder.Record(ctx, 2) valuerecorder.Record(ctx, 2)
integrator.records = nil integrator.accumulations = nil
checkpointed = sdk.Collect(ctx) checkpointed = sdk.Collect(ctx)
count, err := integrator.records[0].Aggregator().(aggregation.Distribution).Count() count, err := integrator.accumulations[0].Aggregator().(aggregation.Distribution).Count()
require.Equal(t, int64(2), count) require.Equal(t, int64(2), count)
require.Equal(t, 1, checkpointed) require.Equal(t, 1, checkpointed)
require.Nil(t, testHandler.Flush()) require.Nil(t, testHandler.Flush())
@ -190,7 +182,7 @@ func TestDisabledInstrument(t *testing.T) {
checkpointed := sdk.Collect(ctx) checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed) require.Equal(t, 0, checkpointed)
require.Equal(t, 0, len(integrator.records)) require.Equal(t, 0, len(integrator.accumulations))
} }
func TestRecordNaN(t *testing.T) { func TestRecordNaN(t *testing.T) {
@ -258,7 +250,7 @@ func TestSDKLabelsDeduplication(t *testing.T) {
sdk.Collect(ctx) sdk.Collect(ctx)
var actual [][]kv.KeyValue var actual [][]kv.KeyValue
for _, rec := range integrator.records { for _, rec := range integrator.accumulations {
sum, _ := rec.Aggregator().(aggregation.Sum).Sum() sum, _ := rec.Aggregator().(aggregation.Sum).Sum()
require.Equal(t, sum, metric.NewInt64Number(2)) require.Equal(t, sum, metric.NewInt64Number(2))
@ -352,11 +344,11 @@ func TestObserverCollection(t *testing.T) {
collected := sdk.Collect(ctx) collected := sdk.Collect(ctx)
require.Equal(t, collected, len(integrator.records)) require.Equal(t, collected, len(integrator.accumulations))
out := batchTest.NewOutput(label.DefaultEncoder()) out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records { for _, rec := range integrator.accumulations {
_ = out.AddTo(rec) require.NoError(t, out.AddAccumulation(rec))
} }
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"float.valueobserver.lastvalue/A=B/R=V": -1, "float.valueobserver.lastvalue/A=B/R=V": -1,
@ -397,7 +389,7 @@ func TestSumObserverInputRange(t *testing.T) {
collected := sdk.Collect(ctx) collected := sdk.Collect(ctx)
require.Equal(t, 0, collected) require.Equal(t, 0, collected)
require.Equal(t, 0, len(integrator.records)) require.Equal(t, 0, len(integrator.accumulations))
// check that the error condition was reset // check that the error condition was reset
require.NoError(t, testHandler.Flush()) require.NoError(t, testHandler.Flush())
@ -455,11 +447,11 @@ func TestObserverBatch(t *testing.T) {
collected := sdk.Collect(ctx) collected := sdk.Collect(ctx)
require.Equal(t, collected, len(integrator.records)) require.Equal(t, collected, len(integrator.accumulations))
out := batchTest.NewOutput(label.DefaultEncoder()) out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records { for _, rec := range integrator.accumulations {
_ = out.AddTo(rec) require.NoError(t, out.AddAccumulation(rec))
} }
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"float.sumobserver.sum//R=V": 1.1, "float.sumobserver.sum//R=V": 1.1,
@ -503,8 +495,8 @@ func TestRecordBatch(t *testing.T) {
sdk.Collect(ctx) sdk.Collect(ctx)
out := batchTest.NewOutput(label.DefaultEncoder()) out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records { for _, rec := range integrator.accumulations {
_ = out.AddTo(rec) require.NoError(t, out.AddAccumulation(rec))
} }
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"int64.sum/A=B,C=D/R=V": 1, "int64.sum/A=B,C=D/R=V": 1,
@ -585,8 +577,8 @@ func TestSyncInAsync(t *testing.T) {
sdk.Collect(ctx) sdk.Collect(ctx)
out := batchTest.NewOutput(label.DefaultEncoder()) out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records { for _, rec := range integrator.accumulations {
_ = out.AddTo(rec) require.NoError(t, out.AddAccumulation(rec))
} }
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"counter.sum//R=V": 100, "counter.sum//R=V": 100,

View File

@ -16,7 +16,9 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
import ( import (
"errors" "errors"
"fmt"
"sync" "sync"
"time"
"go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
@ -48,35 +50,55 @@ type (
// RWMutex implements locking for the `CheckpointSet` interface. // RWMutex implements locking for the `CheckpointSet` interface.
sync.RWMutex sync.RWMutex
values map[batchKey]batchValue values map[batchKey]batchValue
// Note: the timestamp logic currently assumes all
// exports are deltas.
intervalStart time.Time
intervalEnd time.Time
// startedCollection and finishedCollection are the
// number of StartCollection() and FinishCollection()
// calls, used to ensure that the sequence of starts
// and finishes are correctly balanced.
startedCollection int64
finishedCollection int64
} }
) )
var _ export.Integrator = &Integrator{} var _ export.Integrator = &Integrator{}
var _ export.CheckpointSet = &batch{} var _ export.CheckpointSet = &batch{}
var ErrInconsistentState = fmt.Errorf("inconsistent integrator state")
func New(selector export.AggregationSelector, stateful bool) *Integrator { func New(selector export.AggregationSelector, stateful bool) *Integrator {
return &Integrator{ return &Integrator{
AggregationSelector: selector, AggregationSelector: selector,
stateful: stateful, stateful: stateful,
batch: batch{ batch: batch{
values: map[batchKey]batchValue{}, values: map[batchKey]batchValue{},
intervalStart: time.Now(),
}, },
} }
} }
func (b *Integrator) Process(record export.Record) error { func (b *Integrator) Process(accumulation export.Accumulation) error {
desc := record.Descriptor() if b.startedCollection != b.finishedCollection+1 {
return ErrInconsistentState
}
desc := accumulation.Descriptor()
key := batchKey{ key := batchKey{
descriptor: desc, descriptor: desc,
distinct: record.Labels().Equivalent(), distinct: accumulation.Labels().Equivalent(),
resource: record.Resource().Equivalent(), resource: accumulation.Resource().Equivalent(),
} }
agg := record.Aggregator() agg := accumulation.Aggregator()
value, ok := b.batch.values[key] value, ok := b.batch.values[key]
if ok { if ok {
// Note: The call to Merge here combines only // Note: The call to Merge here combines only
// identical records. It is required even for a // identical accumulations. It is required even for a
// stateless Integrator because such identical records // stateless Integrator because such identical accumulations
// may arise in the Meter implementation due to race // may arise in the Meter implementation due to race
// conditions. // conditions.
return value.aggregator.Merge(agg, desc) return value.aggregator.Merge(agg, desc)
@ -96,8 +118,8 @@ func (b *Integrator) Process(record export.Record) error {
} }
b.batch.values[key] = batchValue{ b.batch.values[key] = batchValue{
aggregator: agg, aggregator: agg,
labels: record.Labels(), labels: accumulation.Labels(),
resource: record.Resource(), resource: accumulation.Resource(),
} }
return nil return nil
} }
@ -106,19 +128,38 @@ func (b *Integrator) CheckpointSet() export.CheckpointSet {
return &b.batch return &b.batch
} }
func (b *Integrator) FinishedCollection() { func (b *Integrator) StartCollection() {
if b.startedCollection != 0 {
b.intervalStart = b.intervalEnd
}
b.startedCollection++
if !b.stateful { if !b.stateful {
b.batch.values = map[batchKey]batchValue{} b.batch.values = map[batchKey]batchValue{}
} }
} }
func (b *Integrator) FinishCollection() error {
b.finishedCollection++
b.intervalEnd = time.Now()
if b.startedCollection != b.finishedCollection {
return ErrInconsistentState
}
return nil
}
func (b *batch) ForEach(f func(export.Record) error) error { func (b *batch) ForEach(f func(export.Record) error) error {
if b.startedCollection != b.finishedCollection {
return ErrInconsistentState
}
for key, value := range b.values { for key, value := range b.values {
if err := f(export.NewRecord( if err := f(export.NewRecord(
key.descriptor, key.descriptor,
value.labels, value.labels,
value.resource, value.resource,
value.aggregator, value.aggregator.Aggregation(),
b.intervalStart,
b.intervalEnd,
)); err != nil && !errors.Is(err, aggregation.ErrNoData) { )); err != nil && !errors.Is(err, aggregation.ErrNoData) {
return err return err
} }

View File

@ -17,6 +17,7 @@ package simple_test
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -75,13 +76,13 @@ func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator {
} }
// Convenience method for building a test exported lastValue record. // Convenience method for building a test exported lastValue record.
func NewLastValueRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record { func NewLastValueAccumulation(desc *metric.Descriptor, labels *label.Set, value int64) export.Accumulation {
return export.NewRecord(desc, labels, Resource, LastValueAgg(desc, value)) return export.NewAccumulation(desc, labels, Resource, LastValueAgg(desc, value))
} }
// Convenience method for building a test exported counter record. // Convenience method for building a test exported counter record.
func NewCounterRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record { func NewCounterAccumulation(desc *metric.Descriptor, labels *label.Set, value int64) export.Accumulation {
return export.NewRecord(desc, labels, Resource, CounterAgg(desc, value)) return export.NewAccumulation(desc, labels, Resource, CounterAgg(desc, value))
} }
// CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value. // CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value.
@ -95,36 +96,40 @@ func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator {
func TestSimpleStateless(t *testing.T) { func TestSimpleStateless(t *testing.T) {
b := simple.New(test.AggregationSelector(), false) b := simple.New(test.AggregationSelector(), false)
// Set initial lastValue values b.StartCollection()
_ = b.Process(NewLastValueRecord(&LastValueADesc, Labels1, 10))
_ = b.Process(NewLastValueRecord(&LastValueADesc, Labels2, 20))
_ = b.Process(NewLastValueRecord(&LastValueADesc, Labels3, 30))
_ = b.Process(NewLastValueRecord(&LastValueBDesc, Labels1, 10)) // Set initial lastValue values
_ = b.Process(NewLastValueRecord(&LastValueBDesc, Labels2, 20)) _ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels1, 10))
_ = b.Process(NewLastValueRecord(&LastValueBDesc, Labels3, 30)) _ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels2, 20))
_ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels3, 30))
_ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels1, 10))
_ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels2, 20))
_ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels3, 30))
// Another lastValue Set for Labels1 // Another lastValue Set for Labels1
_ = b.Process(NewLastValueRecord(&LastValueADesc, Labels1, 50)) _ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels1, 50))
_ = b.Process(NewLastValueRecord(&LastValueBDesc, Labels1, 50)) _ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels1, 50))
// Set initial counter values // Set initial counter values
_ = b.Process(NewCounterRecord(&CounterADesc, Labels1, 10)) _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10))
_ = b.Process(NewCounterRecord(&CounterADesc, Labels2, 20)) _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels2, 20))
_ = b.Process(NewCounterRecord(&CounterADesc, Labels3, 40)) _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels3, 40))
_ = b.Process(NewCounterRecord(&CounterBDesc, Labels1, 10)) _ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels1, 10))
_ = b.Process(NewCounterRecord(&CounterBDesc, Labels2, 20)) _ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels2, 20))
_ = b.Process(NewCounterRecord(&CounterBDesc, Labels3, 40)) _ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels3, 40))
// Another counter Add for Labels1 // Another counter Add for Labels1
_ = b.Process(NewCounterRecord(&CounterADesc, Labels1, 50)) _ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 50))
_ = b.Process(NewCounterRecord(&CounterBDesc, Labels1, 50)) _ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels1, 50))
require.NoError(t, b.FinishCollection())
checkpointSet := b.CheckpointSet() checkpointSet := b.CheckpointSet()
records := test.NewOutput(label.DefaultEncoder()) records := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records.AddTo) _ = checkpointSet.ForEach(records.AddRecord)
// Output lastvalue should have only the "G=H" and "G=" keys. // Output lastvalue should have only the "G=H" and "G=" keys.
// Output counter should have only the "C=D" and "C=" keys. // Output counter should have only the "C=D" and "C=" keys.
@ -142,32 +147,34 @@ func TestSimpleStateless(t *testing.T) {
"b.lastvalue/C=D,E=F/R=V": 20, // labels2 "b.lastvalue/C=D,E=F/R=V": 20, // labels2
"b.lastvalue//R=V": 30, // labels3 "b.lastvalue//R=V": 30, // labels3
}, records.Map) }, records.Map)
b.FinishedCollection()
// Verify that state was reset // Verify that state was reset
b.StartCollection()
require.NoError(t, b.FinishCollection())
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
_ = checkpointSet.ForEach(func(rec export.Record) error { _ = checkpointSet.ForEach(func(rec export.Record) error {
t.Fatal("Unexpected call") t.Fatal("Unexpected call")
return nil return nil
}) })
b.FinishedCollection()
} }
func TestSimpleStateful(t *testing.T) { func TestSimpleStateful(t *testing.T) {
ctx := context.Background() ctx := context.Background()
b := simple.New(test.AggregationSelector(), true) b := simple.New(test.AggregationSelector(), true)
counterA := NewCounterRecord(&CounterADesc, Labels1, 10) b.StartCollection()
counterA := NewCounterAccumulation(&CounterADesc, Labels1, 10)
_ = b.Process(counterA) _ = b.Process(counterA)
counterB := NewCounterRecord(&CounterBDesc, Labels1, 10) counterB := NewCounterAccumulation(&CounterBDesc, Labels1, 10)
_ = b.Process(counterB) _ = b.Process(counterB)
require.NoError(t, b.FinishCollection())
checkpointSet := b.CheckpointSet() checkpointSet := b.CheckpointSet()
b.FinishedCollection()
records1 := test.NewOutput(label.DefaultEncoder()) records1 := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records1.AddTo) _ = checkpointSet.ForEach(records1.AddRecord)
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"a.sum/C=D,G=H/R=V": 10, // labels1 "a.sum/C=D,G=H/R=V": 10, // labels1
@ -180,11 +187,13 @@ func TestSimpleStateful(t *testing.T) {
// Test that state was NOT reset // Test that state was NOT reset
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
b.StartCollection()
require.NoError(t, b.FinishCollection())
records2 := test.NewOutput(label.DefaultEncoder()) records2 := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records2.AddTo) _ = checkpointSet.ForEach(records2.AddRecord)
require.EqualValues(t, records1.Map, records2.Map) require.EqualValues(t, records1.Map, records2.Map)
b.FinishedCollection()
// Update and re-checkpoint the original record. // Update and re-checkpoint the original record.
_ = caggA.Update(ctx, metric.NewInt64Number(20), &CounterADesc) _ = caggA.Update(ctx, metric.NewInt64Number(20), &CounterADesc)
@ -199,23 +208,92 @@ func TestSimpleStateful(t *testing.T) {
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
records3 := test.NewOutput(label.DefaultEncoder()) records3 := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records3.AddTo) _ = checkpointSet.ForEach(records3.AddRecord)
require.EqualValues(t, records1.Map, records3.Map) require.EqualValues(t, records1.Map, records3.Map)
b.FinishedCollection() b.StartCollection()
// Now process the second update // Now process the second update
_ = b.Process(export.NewRecord(&CounterADesc, Labels1, Resource, ckptA)) _ = b.Process(export.NewAccumulation(&CounterADesc, Labels1, Resource, ckptA))
_ = b.Process(export.NewRecord(&CounterBDesc, Labels1, Resource, ckptB)) _ = b.Process(export.NewAccumulation(&CounterBDesc, Labels1, Resource, ckptB))
require.NoError(t, b.FinishCollection())
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
records4 := test.NewOutput(label.DefaultEncoder()) records4 := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records4.AddTo) _ = checkpointSet.ForEach(records4.AddRecord)
require.EqualValues(t, map[string]float64{ require.EqualValues(t, map[string]float64{
"a.sum/C=D,G=H/R=V": 30, "a.sum/C=D,G=H/R=V": 30,
"b.sum/C=D,G=H/R=V": 30, "b.sum/C=D,G=H/R=V": 30,
}, records4.Map) }, records4.Map)
b.FinishedCollection() }
func TestSimpleInconsistent(t *testing.T) {
// Test double-start
b := simple.New(test.AggregationSelector(), true)
b.StartCollection()
b.StartCollection()
require.Equal(t, simple.ErrInconsistentState, b.FinishCollection())
// Test finish without start
b = simple.New(test.AggregationSelector(), true)
require.Equal(t, simple.ErrInconsistentState, b.FinishCollection())
// Test no finish
b = simple.New(test.AggregationSelector(), true)
b.StartCollection()
require.Equal(t, simple.ErrInconsistentState, b.ForEach(func(export.Record) error { return nil }))
// Test no start
b = simple.New(test.AggregationSelector(), true)
require.Equal(t, simple.ErrInconsistentState, b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10)))
}
func TestSimpleTimestamps(t *testing.T) {
beforeNew := time.Now()
b := simple.New(test.AggregationSelector(), true)
afterNew := time.Now()
b.StartCollection()
_ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10))
require.NoError(t, b.FinishCollection())
var start1, end1 time.Time
require.NoError(t, b.ForEach(func(rec export.Record) error {
start1 = rec.StartTime()
end1 = rec.EndTime()
return nil
}))
// The first start time is set in the constructor.
require.True(t, beforeNew.Before(start1))
require.True(t, afterNew.After(start1))
for i := 0; i < 2; i++ {
b.StartCollection()
require.NoError(t, b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10)))
require.NoError(t, b.FinishCollection())
var start2, end2 time.Time
require.NoError(t, b.ForEach(func(rec export.Record) error {
start2 = rec.StartTime()
end2 = rec.EndTime()
return nil
}))
// Subsequent intervals have their start and end aligned.
require.Equal(t, start2, end1)
require.True(t, start1.Before(end1))
require.True(t, start2.Before(end2))
start1 = start2
end1 = end2
}
} }

View File

@ -17,6 +17,7 @@ package test
import ( import (
"fmt" "fmt"
"strings" "strings"
"time"
"go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
@ -99,23 +100,42 @@ func (testAggregationSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ..
} }
} }
// AddTo adds a name/label-encoding entry with the lastValue or counter // AddRecord adds a string representation of the exported metric data
// value to the output map. // to a map for use in testing. The value taken from the record is
func (o Output) AddTo(rec export.Record) error { // either the Sum() or the LastValue() of its Aggregation(), whichever
// is defined. Record timestamps are ignored.
func (o Output) AddRecord(rec export.Record) error {
encoded := rec.Labels().Encoded(o.labelEncoder) encoded := rec.Labels().Encoded(o.labelEncoder)
rencoded := rec.Resource().Encoded(o.labelEncoder) rencoded := rec.Resource().Encoded(o.labelEncoder)
key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded) key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded)
var value float64 var value float64
if s, ok := rec.Aggregator().(aggregation.Sum); ok { if s, ok := rec.Aggregation().(aggregation.Sum); ok {
sum, _ := s.Sum() sum, _ := s.Sum()
value = sum.CoerceToFloat64(rec.Descriptor().NumberKind()) value = sum.CoerceToFloat64(rec.Descriptor().NumberKind())
} else if l, ok := rec.Aggregator().(aggregation.LastValue); ok { } else if l, ok := rec.Aggregation().(aggregation.LastValue); ok {
last, _, _ := l.LastValue() last, _, _ := l.LastValue()
value = last.CoerceToFloat64(rec.Descriptor().NumberKind()) value = last.CoerceToFloat64(rec.Descriptor().NumberKind())
} else { } else {
panic(fmt.Sprintf("Unhandled aggregator type: %T", rec.Aggregator())) panic(fmt.Sprintf("Unhandled aggregator type: %T", rec.Aggregation()))
} }
o.Map[key] = value o.Map[key] = value
return nil return nil
} }
// AddAccumulation adds a string representation of the exported metric
// data to a map for use in testing. The value taken from the
// accumulation is either the Sum() or the LastValue() of its
// Aggregator().Aggregation(), whichever is defined.
func (o Output) AddAccumulation(acc export.Accumulation) error {
return o.AddRecord(
export.NewRecord(
acc.Descriptor(),
acc.Labels(),
acc.Resource(),
acc.Aggregator().Aggregation(),
time.Time{},
time.Time{},
),
)
}

View File

@ -444,8 +444,8 @@ func (m *Accumulator) checkpointRecord(r *record) int {
return 0 return 0
} }
exportRecord := export.NewRecord(&r.inst.descriptor, r.labels, m.resource, r.checkpoint) a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint)
err = m.integrator.Process(exportRecord) err = m.integrator.Process(a)
if err != nil { if err != nil {
global.Handle(err) global.Handle(err)
} }
@ -462,8 +462,8 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
epochDiff := m.currentEpoch - lrec.observedEpoch epochDiff := m.currentEpoch - lrec.observedEpoch
if epochDiff == 0 { if epochDiff == 0 {
if lrec.observed != nil { if lrec.observed != nil {
exportRecord := export.NewRecord(&a.descriptor, lrec.labels, m.resource, lrec.observed) a := export.NewAccumulation(&a.descriptor, lrec.labels, m.resource, lrec.observed)
err := m.integrator.Process(exportRecord) err := m.integrator.Process(a)
if err != nil { if err != nil {
global.Handle(err) global.Handle(err)
} }

View File

@ -249,14 +249,11 @@ func (*testFixture) CheckpointSet() export.CheckpointSet {
return nil return nil
} }
func (*testFixture) FinishedCollection() { func (f *testFixture) Process(accumulation export.Accumulation) error {
} labels := accumulation.Labels().ToSlice()
func (f *testFixture) Process(record export.Record) error {
labels := record.Labels().ToSlice()
key := testKey{ key := testKey{
labels: canonicalizeLabels(labels), labels: canonicalizeLabels(labels),
descriptor: record.Descriptor(), descriptor: accumulation.Descriptor(),
} }
if f.dupCheck[key] == 0 { if f.dupCheck[key] == 0 {
f.dupCheck[key]++ f.dupCheck[key]++
@ -266,8 +263,8 @@ func (f *testFixture) Process(record export.Record) error {
actual, _ := f.received.LoadOrStore(key, f.impl.newStore()) actual, _ := f.received.LoadOrStore(key, f.impl.newStore())
agg := record.Aggregator() agg := accumulation.Aggregator()
switch record.Descriptor().MetricKind() { switch accumulation.Descriptor().MetricKind() {
case metric.CounterKind: case metric.CounterKind:
sum, err := agg.(aggregation.Sum).Sum() sum, err := agg.(aggregation.Sum).Sum()
if err != nil { if err != nil {