From 69da3056f24170ccbfcd81031819fa36025e02a4 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 18 May 2020 17:44:28 -0700 Subject: [PATCH] Move Resource into the metric export Record (#739) * Checkpoint * Tests pass --- exporters/metric/prometheus/example_test.go | 2 +- exporters/metric/prometheus/prometheus.go | 5 +- .../metric/prometheus/prometheus_test.go | 4 +- exporters/metric/stdout/stdout.go | 5 +- exporters/metric/stdout/stdout_test.go | 48 +++++----- exporters/metric/test/test.go | 13 ++- exporters/otlp/internal/transform/metric.go | 8 +- exporters/otlp/otlp.go | 5 +- exporters/otlp/otlp_metric_test.go | 11 +-- sdk/export/metric/metric.go | 14 ++- sdk/metric/config.go | 19 ++++ sdk/metric/controller/push/push.go | 13 +-- sdk/metric/controller/push/push_test.go | 25 +++-- sdk/metric/correct_test.go | 94 +++++++------------ sdk/metric/integrator/simple/simple.go | 6 ++ sdk/metric/integrator/simple/simple_test.go | 36 +++---- sdk/metric/integrator/test/test.go | 11 ++- sdk/metric/sdk.go | 7 +- 18 files changed, 172 insertions(+), 154 deletions(-) diff --git a/exporters/metric/prometheus/example_test.go b/exporters/metric/prometheus/example_test.go index 81e741a38..1a15e38de 100644 --- a/exporters/metric/prometheus/example_test.go +++ b/exporters/metric/prometheus/example_test.go @@ -66,7 +66,7 @@ func ExampleNewExportPipeline() { // Simulate a push meterImpl.Collect(ctx) - err = exporter.Export(ctx, nil, integrator.CheckpointSet()) + err = exporter.Export(ctx, integrator.CheckpointSet()) if err != nil { panic(err) } diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 4d615fdf4..86bedf02a 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/controller/push" integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple" - "go.opentelemetry.io/otel/sdk/resource" ) // Exporter is an implementation of metric.Exporter that sends metrics to @@ -169,8 +168,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h } // Export exports the provide metric record to prometheus. -func (e *Exporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error { - // TODO: Use the resource value in this exporter. +func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { e.snapshot = checkpointSet return nil } @@ -211,6 +209,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { err := c.exp.snapshot.ForEach(func(record export.Record) error { agg := record.Aggregator() numberKind := record.Descriptor().NumberKind() + // TODO: Use the resource value in this record. labels := labelValues(record.Labels()) desc := c.toDesc(&record) diff --git a/exporters/metric/prometheus/prometheus_test.go b/exporters/metric/prometheus/prometheus_test.go index f30813b06..a95d09ad3 100644 --- a/exporters/metric/prometheus/prometheus_test.go +++ b/exporters/metric/prometheus/prometheus_test.go @@ -39,7 +39,7 @@ func TestPrometheusExporter(t *testing.T) { } var expected []string - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(nil) counter := metric.NewDescriptor( "counter", metric.CounterKind, metric.Float64NumberKind) @@ -117,7 +117,7 @@ func TestPrometheusExporter(t *testing.T) { } func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) { - err := exporter.Export(context.Background(), nil, checkpointSet) + err := exporter.Export(context.Background(), checkpointSet) require.Nil(t, err) rec := httptest.NewRecorder() diff --git a/exporters/metric/stdout/stdout.go b/exporters/metric/stdout/stdout.go index 5e8b513a5..433288503 100644 --- a/exporters/metric/stdout/stdout.go +++ b/exporters/metric/stdout/stdout.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/label" - "go.opentelemetry.io/otel/sdk/resource" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" @@ -145,18 +144,18 @@ func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) return pusher, nil } -func (e *Exporter) Export(_ context.Context, resource *resource.Resource, checkpointSet export.CheckpointSet) error { +func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { var aggError error var batch expoBatch if !e.config.DoNotPrintTime { ts := time.Now() batch.Timestamp = &ts } - encodedResource := resource.Encoded(e.config.LabelEncoder) aggError = checkpointSet.ForEach(func(record export.Record) error { desc := record.Descriptor() agg := record.Aggregator() kind := desc.NumberKind() + encodedResource := record.Resource().Encoded(e.config.LabelEncoder) var expose expoLine diff --git a/exporters/metric/stdout/stdout_test.go b/exporters/metric/stdout/stdout_test.go index 2dee68e55..1d5805ba2 100644 --- a/exporters/metric/stdout/stdout_test.go +++ b/exporters/metric/stdout/stdout_test.go @@ -44,10 +44,11 @@ type testFixture struct { ctx context.Context exporter *stdout.Exporter output *bytes.Buffer - resource *resource.Resource } -func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) testFixture { +var testResource = resource.New(kv.String("R", "V")) + +func newFixture(t *testing.T, config stdout.Config) testFixture { buf := &bytes.Buffer{} config.Writer = buf config.DoNotPrintTime = true @@ -60,7 +61,6 @@ func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) ctx: context.Background(), exporter: exp, output: buf, - resource: resource, } } @@ -69,7 +69,7 @@ func (fix testFixture) Output() string { } func (fix testFixture) Export(checkpointSet export.CheckpointSet) { - err := fix.exporter.Export(fix.ctx, fix.resource, checkpointSet) + err := fix.exporter.Export(fix.ctx, checkpointSet) if err != nil { fix.t.Error("export failed: ", err) } @@ -95,7 +95,7 @@ func TestStdoutTimestamp(t *testing.T) { before := time.Now() - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) ctx := context.Background() desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind) @@ -105,7 +105,7 @@ func TestStdoutTimestamp(t *testing.T) { checkpointSet.Add(&desc, lvagg) - if err := exporter.Export(ctx, nil, checkpointSet); err != nil { + if err := exporter.Export(ctx, checkpointSet); err != nil { t.Fatal("Unexpected export error: ", err) } @@ -139,9 +139,9 @@ func TestStdoutTimestamp(t *testing.T) { } func TestStdoutCounterFormat(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind) cagg := sum.New() @@ -152,13 +152,13 @@ func TestStdoutCounterFormat(t *testing.T) { fix.Export(checkpointSet) - require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","sum":123}]}`, fix.Output()) + require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","sum":123}]}`, fix.Output()) } func TestStdoutLastValueFormat(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() @@ -169,13 +169,13 @@ func TestStdoutLastValueFormat(t *testing.T) { fix.Export(checkpointSet) - require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","last":123.456}]}`, fix.Output()) + require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","last":123.456}]}`, fix.Output()) } func TestStdoutMinMaxSumCount(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind) magg := minmaxsumcount.New(&desc) @@ -187,15 +187,15 @@ func TestStdoutMinMaxSumCount(t *testing.T) { fix.Export(checkpointSet) - require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output()) + require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output()) } func TestStdoutValueRecorderFormat(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{ + fix := newFixture(t, stdout.Config{ PrettyPrint: true, }) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind) magg := array.New() @@ -213,7 +213,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) { require.Equal(t, `{ "updates": [ { - "name": "test.name{A=B,C=D}", + "name": "test.name{R=V,A=B,C=D}", "min": 0.5, "max": 999.5, "sum": 500000, @@ -247,9 +247,9 @@ func TestStdoutNoData(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) magg := tc magg.Checkpoint(fix.ctx, &desc) @@ -264,9 +264,9 @@ func TestStdoutNoData(t *testing.T) { } func TestStdoutLastValueNotSet(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() @@ -314,9 +314,9 @@ func TestStdoutResource(t *testing.T) { } for _, tc := range testCases { - fix := newFixture(t, tc.res, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(tc.res) desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index bc49cd9c9..cb99b6489 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" + "go.opentelemetry.io/otel/sdk/resource" ) type mapkey struct { @@ -35,15 +36,17 @@ type mapkey struct { } type CheckpointSet struct { - records map[mapkey]export.Record - updates []export.Record + records map[mapkey]export.Record + resource *resource.Resource + updates []export.Record } // NewCheckpointSet returns a test CheckpointSet that new records could be added. // Records are grouped by their encoded labels. -func NewCheckpointSet() *CheckpointSet { +func NewCheckpointSet(resource *resource.Resource) *CheckpointSet { return &CheckpointSet{ - records: make(map[mapkey]export.Record), + records: make(map[mapkey]export.Record), + resource: resource, } } @@ -67,7 +70,7 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l return record.Aggregator(), false } - rec := export.NewRecord(desc, &elabels, newAgg) + rec := export.NewRecord(desc, &elabels, p.resource, newAgg) p.updates = append(p.updates, rec) p.records[key] = rec return newAgg, true diff --git a/exporters/otlp/internal/transform/metric.go b/exporters/otlp/internal/transform/metric.go index f9dd696bd..318378961 100644 --- a/exporters/otlp/internal/transform/metric.go +++ b/exporters/otlp/internal/transform/metric.go @@ -61,7 +61,7 @@ type result struct { // CheckpointSet transforms all records contained in a checkpoint into // batched OTLP ResourceMetrics. -func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) { +func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) { records, errc := source(ctx, cps) // Start a fixed number of goroutines to transform records. @@ -71,7 +71,7 @@ func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export. for i := uint(0); i < numWorkers; i++ { go func() { defer wg.Done() - transformer(ctx, resource, records, transformed) + transformer(ctx, records, transformed) }() } go func() { @@ -116,7 +116,7 @@ func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record // transformer transforms records read from the passed in chan into // OTLP Metrics which are sent on the out chan. -func transformer(ctx context.Context, resource *resource.Resource, in <-chan export.Record, out chan<- result) { +func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) { for r := range in { m, err := Record(r) // Propagate errors, but do not send empty results. @@ -124,7 +124,7 @@ func transformer(ctx context.Context, resource *resource.Resource, in <-chan exp continue } res := result{ - Resource: resource, + Resource: r.Resource(), Library: r.Descriptor().LibraryName(), Metric: m, Err: err, diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 0c06676be..d0e83f944 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal/transform" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" tracesdk "go.opentelemetry.io/otel/sdk/export/trace" - "go.opentelemetry.io/otel/sdk/resource" ) type Exporter struct { @@ -212,7 +211,7 @@ func (e *Exporter) Stop() error { // Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter // interface. It transforms and batches metric Records into OTLP Metrics and // transmits them to the configured collector. -func (e *Exporter) Export(parent context.Context, resource *resource.Resource, cps metricsdk.CheckpointSet) error { +func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error { // Unify the parent context Done signal with the exporter stopCh. ctx, cancel := context.WithCancel(parent) defer cancel() @@ -224,7 +223,7 @@ func (e *Exporter) Export(parent context.Context, resource *resource.Resource, c } }(ctx, cancel) - rms, err := transform.CheckpointSet(ctx, resource, cps, e.c.numWorkers) + rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers) if err != nil { return err } diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index 4d72d541a..db47cb157 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -659,11 +659,10 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me equiv := r.resource.Equivalent() resources[equiv] = r.resource - recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, agg)) + recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg)) } - for equiv, records := range recs { - resource := resources[equiv] - assert.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: records})) + for _, records := range recs { + assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records})) } // assert.ElementsMatch does not equate nested slices of different order, @@ -713,8 +712,6 @@ func TestEmptyMetricExport(t *testing.T) { exp.metricExporter = msc exp.started = true - resource := resource.New(kv.String("R", "S")) - for _, test := range []struct { records []metricsdk.Record want []metricpb.ResourceMetrics @@ -729,7 +726,7 @@ func TestEmptyMetricExport(t *testing.T) { }, } { msc.Reset() - require.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: test.records})) + require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records})) assert.Equal(t, test.want, msc.ResourceMetrics()) } } diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index c2d583173..86f195aa1 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -154,12 +154,9 @@ type Exporter interface { // The Context comes from the controller that initiated // collection. // - // The Resource contains common attributes that apply to all - // metric events in the SDK. - // // The CheckpointSet interface refers to the Integrator that just // completed collection. - Export(context.Context, *resource.Resource, CheckpointSet) error + Export(context.Context, CheckpointSet) error } // CheckpointSet allows a controller to access a complete checkpoint of @@ -183,16 +180,18 @@ type CheckpointSet interface { type Record struct { descriptor *metric.Descriptor labels *label.Set + resource *resource.Resource aggregator Aggregator } // NewRecord allows Integrator implementations to construct export // records. The Descriptor, Labels, and Aggregator represent // aggregate metric events received over a single collection period. -func NewRecord(descriptor *metric.Descriptor, labels *label.Set, aggregator Aggregator) Record { +func NewRecord(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator Aggregator) Record { return Record{ descriptor: descriptor, labels: labels, + resource: resource, aggregator: aggregator, } } @@ -213,3 +212,8 @@ func (r Record) Descriptor() *metric.Descriptor { func (r Record) Labels() *label.Set { return r.labels } + +// Resource contains common attributes that apply to this metric event. +func (r Record) Resource() *resource.Resource { + return r.resource +} diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 44f06fbe3..dbdbc57f5 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -14,6 +14,8 @@ package metric +import "go.opentelemetry.io/otel/sdk/resource" + // Config contains configuration for an SDK. type Config struct { // ErrorHandler is the function called when the SDK encounters an error. @@ -21,6 +23,10 @@ type Config struct { // This option can be overridden after instantiation of the SDK // with the `SetErrorHandler` method. ErrorHandler ErrorHandler + + // Resource describes all the metric records processed by the + // Accumulator. + Resource *resource.Resource } // Option is the interface that applies the value to a configuration option. @@ -39,3 +45,16 @@ type errorHandlerOption ErrorHandler func (o errorHandlerOption) Apply(config *Config) { config.ErrorHandler = ErrorHandler(o) } + +// WithResource sets the Resource configuration option of a Config. +func WithResource(res *resource.Resource) Option { + return resourceOption{res} +} + +type resourceOption struct { + *resource.Resource +} + +func (o resourceOption) Apply(config *Config) { + config.Resource = o.Resource +} diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index 0e00ce5fd..ea347d3fa 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -31,7 +31,7 @@ type Controller struct { lock sync.Mutex collectLock sync.Mutex accumulator *sdk.Accumulator - resource *resource.Resource + provider *registry.Provider errorHandler sdk.ErrorHandler integrator export.Integrator exporter export.Exporter @@ -40,7 +40,6 @@ type Controller struct { period time.Duration ticker Ticker clock Clock - provider *registry.Provider } // Several types below are created to match "github.com/benbjohnson/clock" @@ -71,15 +70,17 @@ var _ Ticker = realTicker{} // configuration options to configure an SDK with periodic collection. // The integrator itself is configured with the aggregation selector policy. func New(integrator export.Integrator, exporter export.Exporter, period time.Duration, opts ...Option) *Controller { - c := &Config{ErrorHandler: sdk.DefaultErrorHandler} + c := &Config{ + ErrorHandler: sdk.DefaultErrorHandler, + Resource: resource.Empty(), + } for _, opt := range opts { opt.Apply(c) } - impl := sdk.NewAccumulator(integrator, sdk.WithErrorHandler(c.ErrorHandler)) + impl := sdk.NewAccumulator(integrator, sdk.WithErrorHandler(c.ErrorHandler), sdk.WithResource(c.Resource)) return &Controller{ accumulator: impl, - resource: c.Resource, provider: registry.NewProvider(impl), errorHandler: c.ErrorHandler, integrator: integrator, @@ -166,7 +167,7 @@ func (c *Controller) tick() { mtx: &c.collectLock, delegate: c.integrator.CheckpointSet(), } - err := c.exporter.Export(ctx, c.resource, checkpointSet) + err := c.exporter.Export(ctx, checkpointSet) c.integrator.FinishedCollection() if err != nil { diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index fd200f4d0..dc4ae94c6 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -25,6 +25,8 @@ import ( "github.com/benbjohnson/clock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/test" export "go.opentelemetry.io/otel/sdk/export/metric" @@ -42,6 +44,8 @@ type testIntegrator struct { finishes int } +var testResource = resource.New(kv.String("R", "V")) + type testExporter struct { t *testing.T lock sync.Mutex @@ -68,7 +72,7 @@ var _ push.Clock = mockClock{} var _ push.Ticker = mockTicker{} func newFixture(t *testing.T) testFixture { - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) integrator := &testIntegrator{ t: t, @@ -115,7 +119,7 @@ func (b *testIntegrator) getCounts() (checkpoints, finishes int) { return b.checkpoints, b.finishes } -func (e *testExporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error { +func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { e.lock.Lock() defer e.lock.Unlock() e.exports++ @@ -213,6 +217,7 @@ func TestPushTicker(t *testing.T) { require.Equal(t, 1, exports) require.Equal(t, 1, len(records)) require.Equal(t, "counter", records[0].Descriptor().Name()) + require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder())) sum, err := records[0].Aggregator().(aggregator.Sum).Sum() require.Equal(t, int64(3), sum.AsInt64()) @@ -232,6 +237,7 @@ func TestPushTicker(t *testing.T) { require.Equal(t, 2, exports) require.Equal(t, 1, len(records)) require.Equal(t, "counter", records[0].Descriptor().Name()) + require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder())) sum, err = records[0].Aggregator().(aggregator.Sum).Sum() require.Equal(t, int64(7), sum.AsInt64()) @@ -256,8 +262,8 @@ func TestPushExportError(t *testing.T) { expectedDescriptors []string expectedError error }{ - {"errNone", nil, []string{"counter1", "counter2"}, nil}, - {"errNoData", aggregator.ErrNoData, []string{"counter2"}, nil}, + {"errNone", nil, []string{"counter1{R=V,X=Y}", "counter2{R=V,}"}, nil}, + {"errNoData", aggregator.ErrNoData, []string{"counter2{R=V,}"}, nil}, {"errUnexpected", errAggregator, []string{}, errAggregator}, } for _, tt := range tests { @@ -287,7 +293,7 @@ func TestPushExportError(t *testing.T) { p.Start() runtime.Gosched() - counter1.Add(ctx, 3) + counter1.Add(ctx, 3, kv.String("X", "Y")) counter2.Add(ctx, 5) require.Equal(t, 0, fix.exporter.exports) @@ -311,11 +317,16 @@ func TestPushExportError(t *testing.T) { lock.Unlock() require.Equal(t, len(tt.expectedDescriptors), len(records)) for _, r := range records { - require.Contains(t, tt.expectedDescriptors, r.Descriptor().Name()) + require.Contains(t, tt.expectedDescriptors, + fmt.Sprintf("%s{%s,%s}", + r.Descriptor().Name(), + r.Resource().Encoded(label.DefaultEncoder()), + r.Labels().Encoded(label.DefaultEncoder()), + ), + ) } p.Stop() - }) } } diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 686e168a5..519bf342b 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -33,9 +33,11 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" batchTest "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/resource" ) var Must = metric.Must +var testResource = resource.New(kv.String("R", "V")) type correctnessIntegrator struct { newAggCount int64 @@ -45,6 +47,15 @@ type correctnessIntegrator struct { records []export.Record } +func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) { + integrator := &correctnessIntegrator{ + t: t, + } + accum := metricsdk.NewAccumulator(integrator, metricsdk.WithResource(testResource)) + meter := metric.WrapMeterImpl(accum, "test") + return meter, accum, integrator +} + func (cb *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) { name := descriptor.Name() @@ -77,11 +88,7 @@ func (cb *correctnessIntegrator) Process(_ context.Context, record export.Record func TestInputRangeTestCounter(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -109,11 +116,7 @@ func TestInputRangeTestCounter(t *testing.T) { func TestInputRangeTestValueRecorder(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -144,11 +147,7 @@ func TestInputRangeTestValueRecorder(t *testing.T) { func TestDisabledInstrument(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) valuerecorder := Must(meter).NewFloat64ValueRecorder("name.disabled") @@ -161,12 +160,7 @@ func TestDisabledInstrument(t *testing.T) { func TestRecordNaN(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, _ := newSDK(t) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -181,11 +175,7 @@ func TestRecordNaN(t *testing.T) { func TestSDKLabelsDeduplication(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) counter := Must(meter).NewInt64Counter("counter") @@ -284,12 +274,7 @@ func TestDefaultLabelEncoder(t *testing.T) { func TestObserverCollection(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) _ = Must(meter).RegisterFloat64ValueObserver("float.valueobserver", func(result metric.Float64ObserverResult) { result.Observe(1, kv.String("A", "B")) @@ -317,21 +302,16 @@ func TestObserverCollection(t *testing.T) { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ - "float.valueobserver/A=B": -1, - "float.valueobserver/C=D": -1, - "int.valueobserver/": 1, - "int.valueobserver/A=B": 1, + "float.valueobserver/A=B/R=V": -1, + "float.valueobserver/C=D/R=V": -1, + "int.valueobserver//R=V": 1, + "int.valueobserver/A=B/R=V": 1, }, out.Map) } func TestObserverBatch(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) var floatObs metric.Float64ValueObserver var intObs metric.Int64ValueObserver @@ -371,21 +351,16 @@ func TestObserverBatch(t *testing.T) { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ - "float.valueobserver/A=B": -1, - "float.valueobserver/C=D": -1, - "int.valueobserver/": 1, - "int.valueobserver/A=B": 1, + "float.valueobserver/A=B/R=V": -1, + "float.valueobserver/C=D/R=V": -1, + "int.valueobserver//R=V": 1, + "int.valueobserver/A=B/R=V": 1, }, out.Map) } func TestRecordBatch(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) counter1 := Must(meter).NewInt64Counter("int64.counter") counter2 := Must(meter).NewFloat64Counter("float64.counter") @@ -411,10 +386,10 @@ func TestRecordBatch(t *testing.T) { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ - "int64.counter/A=B,C=D": 1, - "float64.counter/A=B,C=D": 2, - "int64.valuerecorder/A=B,C=D": 3, - "float64.valuerecorder/A=B,C=D": 4, + "int64.counter/A=B,C=D/R=V": 1, + "float64.counter/A=B,C=D/R=V": 2, + "int64.valuerecorder/A=B,C=D/R=V": 3, + "float64.valuerecorder/A=B,C=D/R=V": 4, }, out.Map) } @@ -423,12 +398,7 @@ func TestRecordBatch(t *testing.T) { // that its encoded labels will be cached across collection intervals. func TestRecordPersistence(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) c := Must(meter).NewFloat64Counter("sum.name") b := c.Bind(kv.String("bound", "true")) diff --git a/sdk/metric/integrator/simple/simple.go b/sdk/metric/integrator/simple/simple.go index 9a379c9fc..123361ff0 100644 --- a/sdk/metric/integrator/simple/simple.go +++ b/sdk/metric/integrator/simple/simple.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/otel/api/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + "go.opentelemetry.io/otel/sdk/resource" ) type ( @@ -34,11 +35,13 @@ type ( batchKey struct { descriptor *metric.Descriptor distinct label.Distinct + resource label.Distinct } batchValue struct { aggregator export.Aggregator labels *label.Set + resource *resource.Resource } batchMap map[batchKey]batchValue @@ -64,6 +67,7 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error { key := batchKey{ descriptor: desc, distinct: record.Labels().Equivalent(), + resource: record.Resource().Equivalent(), } agg := record.Aggregator() value, ok := b.batchMap[key] @@ -91,6 +95,7 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error { b.batchMap[key] = batchValue{ aggregator: agg, labels: record.Labels(), + resource: record.Resource(), } return nil } @@ -110,6 +115,7 @@ func (c batchMap) ForEach(f func(export.Record) error) error { if err := f(export.NewRecord( key.descriptor, value.labels, + value.resource, value.aggregator, )); err != nil && !errors.Is(err, aggregator.ErrNoData) { return err diff --git a/sdk/metric/integrator/simple/simple_test.go b/sdk/metric/integrator/simple/simple_test.go index 75a9b7a42..2b43fc8a8 100644 --- a/sdk/metric/integrator/simple/simple_test.go +++ b/sdk/metric/integrator/simple/simple_test.go @@ -68,18 +68,18 @@ func TestUngroupedStateless(t *testing.T) { // Output lastvalue should have only the "G=H" and "G=" keys. // Output counter should have only the "C=D" and "C=" keys. require.EqualValues(t, map[string]float64{ - "sum.a/C~D&G~H": 60, // labels1 - "sum.a/C~D&E~F": 20, // labels2 - "sum.a/": 40, // labels3 - "sum.b/C~D&G~H": 60, // labels1 - "sum.b/C~D&E~F": 20, // labels2 - "sum.b/": 40, // labels3 - "lastvalue.a/C~D&G~H": 50, // labels1 - "lastvalue.a/C~D&E~F": 20, // labels2 - "lastvalue.a/": 30, // labels3 - "lastvalue.b/C~D&G~H": 50, // labels1 - "lastvalue.b/C~D&E~F": 20, // labels2 - "lastvalue.b/": 30, // labels3 + "sum.a/C~D&G~H/R~V": 60, // labels1 + "sum.a/C~D&E~F/R~V": 20, // labels2 + "sum.a//R~V": 40, // labels3 + "sum.b/C~D&G~H/R~V": 60, // labels1 + "sum.b/C~D&E~F/R~V": 20, // labels2 + "sum.b//R~V": 40, // labels3 + "lastvalue.a/C~D&G~H/R~V": 50, // labels1 + "lastvalue.a/C~D&E~F/R~V": 20, // labels2 + "lastvalue.a//R~V": 30, // labels3 + "lastvalue.b/C~D&G~H/R~V": 50, // labels1 + "lastvalue.b/C~D&E~F/R~V": 20, // labels2 + "lastvalue.b//R~V": 30, // labels3 }, records.Map) // Verify that state was reset @@ -110,8 +110,8 @@ func TestUngroupedStateful(t *testing.T) { _ = checkpointSet.ForEach(records1.AddTo) require.EqualValues(t, map[string]float64{ - "sum.a/C~D&G~H": 10, // labels1 - "sum.b/C~D&G~H": 10, // labels1 + "sum.a/C~D&G~H/R~V": 10, // labels1 + "sum.b/C~D&G~H/R~V": 10, // labels1 }, records1.Map) // Test that state was NOT reset @@ -140,8 +140,8 @@ func TestUngroupedStateful(t *testing.T) { require.EqualValues(t, records1.Map, records3.Map) // Now process the second update - _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, caggA)) - _ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, caggB)) + _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA)) + _ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB)) checkpointSet = b.CheckpointSet() b.FinishedCollection() @@ -150,7 +150,7 @@ func TestUngroupedStateful(t *testing.T) { _ = checkpointSet.ForEach(records4.AddTo) require.EqualValues(t, map[string]float64{ - "sum.a/C~D&G~H": 30, - "sum.b/C~D&G~H": 30, + "sum.a/C~D&G~H/R~V": 30, + "sum.b/C~D&G~H/R~V": 30, }, records4.Map) } diff --git a/sdk/metric/integrator/test/test.go b/sdk/metric/integrator/test/test.go index 5f18425a8..7cc6288ab 100644 --- a/sdk/metric/integrator/test/test.go +++ b/sdk/metric/integrator/test/test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" + "go.opentelemetry.io/otel/sdk/resource" ) type ( @@ -45,6 +46,9 @@ type ( ) var ( + // Resource is applied to all test records built in this package. + Resource = resource.New(kv.String("R", "V")) + // LastValueADesc and LastValueBDesc group by "G" LastValueADesc = metric.NewDescriptor( "lastvalue.a", metric.ValueObserverKind, metric.Int64NumberKind) @@ -133,12 +137,12 @@ func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator { // Convenience method for building a test exported lastValue record. func NewLastValueRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record { - return export.NewRecord(desc, labels, LastValueAgg(desc, value)) + return export.NewRecord(desc, labels, Resource, LastValueAgg(desc, value)) } // Convenience method for building a test exported counter record. func NewCounterRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record { - return export.NewRecord(desc, labels, CounterAgg(desc, value)) + return export.NewRecord(desc, labels, Resource, CounterAgg(desc, value)) } // CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value. @@ -154,7 +158,8 @@ func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator { // value to the output map. func (o Output) AddTo(rec export.Record) error { encoded := rec.Labels().Encoded(o.labelEncoder) - key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded) + rencoded := rec.Resource().Encoded(o.labelEncoder) + key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded) var value float64 if s, ok := rec.Aggregator().(aggregator.Sum); ok { diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 9be8b17ca..8de0953e3 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -29,6 +29,7 @@ import ( internal "go.opentelemetry.io/otel/internal/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + "go.opentelemetry.io/otel/sdk/resource" ) type ( @@ -68,6 +69,9 @@ type ( // place for sorting during labels creation to avoid // allocation. It is cleared after use. asyncSortSlice label.Sortable + + // resource is applied to all records in this Accumulator. + resource *resource.Resource } syncInstrument struct { @@ -317,6 +321,7 @@ func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator { integrator: integrator, errorHandler: c.ErrorHandler, asyncInstruments: internal.NewAsyncInstrumentState(c.ErrorHandler), + resource: c.Resource, } } @@ -472,7 +477,7 @@ func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descrip } recorder.Checkpoint(ctx, descriptor) - exportRecord := export.NewRecord(descriptor, labels, recorder) + exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder) err := m.integrator.Process(ctx, exportRecord) if err != nil { m.errorHandler(err)