1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-04 09:43:23 +02:00

Move Resource into the metric export Record (#739)

* Checkpoint

* Tests pass
This commit is contained in:
Joshua MacDonald 2020-05-18 17:44:28 -07:00 committed by GitHub
parent 5a534a0b00
commit 69da3056f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 172 additions and 154 deletions

View File

@ -66,7 +66,7 @@ func ExampleNewExportPipeline() {
// Simulate a push
meterImpl.Collect(ctx)
err = exporter.Export(ctx, nil, integrator.CheckpointSet())
err = exporter.Export(ctx, integrator.CheckpointSet())
if err != nil {
panic(err)
}

View File

@ -32,7 +32,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/controller/push"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)
// Exporter is an implementation of metric.Exporter that sends metrics to
@ -169,8 +168,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
}
// Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
// TODO: Use the resource value in this exporter.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.snapshot = checkpointSet
return nil
}
@ -211,6 +209,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
err := c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
// TODO: Use the resource value in this record.
labels := labelValues(record.Labels())
desc := c.toDesc(&record)

View File

@ -39,7 +39,7 @@ func TestPrometheusExporter(t *testing.T) {
}
var expected []string
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(nil)
counter := metric.NewDescriptor(
"counter", metric.CounterKind, metric.Float64NumberKind)
@ -117,7 +117,7 @@ func TestPrometheusExporter(t *testing.T) {
}
func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) {
err := exporter.Export(context.Background(), nil, checkpointSet)
err := exporter.Export(context.Background(), checkpointSet)
require.Nil(t, err)
rec := httptest.NewRecorder()

View File

@ -25,7 +25,6 @@ import (
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/sdk/resource"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
@ -145,18 +144,18 @@ func NewExportPipeline(config Config, period time.Duration, opts ...push.Option)
return pusher, nil
}
func (e *Exporter) Export(_ context.Context, resource *resource.Resource, checkpointSet export.CheckpointSet) error {
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
var aggError error
var batch expoBatch
if !e.config.DoNotPrintTime {
ts := time.Now()
batch.Timestamp = &ts
}
encodedResource := resource.Encoded(e.config.LabelEncoder)
aggError = checkpointSet.ForEach(func(record export.Record) error {
desc := record.Descriptor()
agg := record.Aggregator()
kind := desc.NumberKind()
encodedResource := record.Resource().Encoded(e.config.LabelEncoder)
var expose expoLine

View File

@ -44,10 +44,11 @@ type testFixture struct {
ctx context.Context
exporter *stdout.Exporter
output *bytes.Buffer
resource *resource.Resource
}
func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) testFixture {
var testResource = resource.New(kv.String("R", "V"))
func newFixture(t *testing.T, config stdout.Config) testFixture {
buf := &bytes.Buffer{}
config.Writer = buf
config.DoNotPrintTime = true
@ -60,7 +61,6 @@ func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config)
ctx: context.Background(),
exporter: exp,
output: buf,
resource: resource,
}
}
@ -69,7 +69,7 @@ func (fix testFixture) Output() string {
}
func (fix testFixture) Export(checkpointSet export.CheckpointSet) {
err := fix.exporter.Export(fix.ctx, fix.resource, checkpointSet)
err := fix.exporter.Export(fix.ctx, checkpointSet)
if err != nil {
fix.t.Error("export failed: ", err)
}
@ -95,7 +95,7 @@ func TestStdoutTimestamp(t *testing.T) {
before := time.Now()
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)
ctx := context.Background()
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind)
@ -105,7 +105,7 @@ func TestStdoutTimestamp(t *testing.T) {
checkpointSet.Add(&desc, lvagg)
if err := exporter.Export(ctx, nil, checkpointSet); err != nil {
if err := exporter.Export(ctx, checkpointSet); err != nil {
t.Fatal("Unexpected export error: ", err)
}
@ -139,9 +139,9 @@ func TestStdoutTimestamp(t *testing.T) {
}
func TestStdoutCounterFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
cagg := sum.New()
@ -152,13 +152,13 @@ func TestStdoutCounterFormat(t *testing.T) {
fix.Export(checkpointSet)
require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","sum":123}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","sum":123}]}`, fix.Output())
}
func TestStdoutLastValueFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
@ -169,13 +169,13 @@ func TestStdoutLastValueFormat(t *testing.T) {
fix.Export(checkpointSet)
require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","last":123.456}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","last":123.456}]}`, fix.Output())
}
func TestStdoutMinMaxSumCount(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := minmaxsumcount.New(&desc)
@ -187,15 +187,15 @@ func TestStdoutMinMaxSumCount(t *testing.T) {
fix.Export(checkpointSet)
require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output())
require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output())
}
func TestStdoutValueRecorderFormat(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{
fix := newFixture(t, stdout.Config{
PrettyPrint: true,
})
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := array.New()
@ -213,7 +213,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
require.Equal(t, `{
"updates": [
{
"name": "test.name{A=B,C=D}",
"name": "test.name{R=V,A=B,C=D}",
"min": 0.5,
"max": 999.5,
"sum": 500000,
@ -247,9 +247,9 @@ func TestStdoutNoData(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)
magg := tc
magg.Checkpoint(fix.ctx, &desc)
@ -264,9 +264,9 @@ func TestStdoutNoData(t *testing.T) {
}
func TestStdoutLastValueNotSet(t *testing.T) {
fix := newFixture(t, nil, stdout.Config{})
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
@ -314,9 +314,9 @@ func TestStdoutResource(t *testing.T) {
}
for _, tc := range testCases {
fix := newFixture(t, tc.res, stdout.Config{})
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(tc.res)
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()

View File

@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
)
type mapkey struct {
@ -35,15 +36,17 @@ type mapkey struct {
}
type CheckpointSet struct {
records map[mapkey]export.Record
updates []export.Record
records map[mapkey]export.Record
resource *resource.Resource
updates []export.Record
}
// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their encoded labels.
func NewCheckpointSet() *CheckpointSet {
func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
return &CheckpointSet{
records: make(map[mapkey]export.Record),
records: make(map[mapkey]export.Record),
resource: resource,
}
}
@ -67,7 +70,7 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
return record.Aggregator(), false
}
rec := export.NewRecord(desc, &elabels, newAgg)
rec := export.NewRecord(desc, &elabels, p.resource, newAgg)
p.updates = append(p.updates, rec)
p.records[key] = rec
return newAgg, true

View File

@ -61,7 +61,7 @@ type result struct {
// CheckpointSet transforms all records contained in a checkpoint into
// batched OTLP ResourceMetrics.
func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
records, errc := source(ctx, cps)
// Start a fixed number of goroutines to transform records.
@ -71,7 +71,7 @@ func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.
for i := uint(0); i < numWorkers; i++ {
go func() {
defer wg.Done()
transformer(ctx, resource, records, transformed)
transformer(ctx, records, transformed)
}()
}
go func() {
@ -116,7 +116,7 @@ func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record
// transformer transforms records read from the passed in chan into
// OTLP Metrics which are sent on the out chan.
func transformer(ctx context.Context, resource *resource.Resource, in <-chan export.Record, out chan<- result) {
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
for r := range in {
m, err := Record(r)
// Propagate errors, but do not send empty results.
@ -124,7 +124,7 @@ func transformer(ctx context.Context, resource *resource.Resource, in <-chan exp
continue
}
res := result{
Resource: resource,
Resource: r.Resource(),
Library: r.Descriptor().LibraryName(),
Metric: m,
Err: err,

View File

@ -31,7 +31,6 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/resource"
)
type Exporter struct {
@ -212,7 +211,7 @@ func (e *Exporter) Stop() error {
// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
// interface. It transforms and batches metric Records into OTLP Metrics and
// transmits them to the configured collector.
func (e *Exporter) Export(parent context.Context, resource *resource.Resource, cps metricsdk.CheckpointSet) error {
func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
// Unify the parent context Done signal with the exporter stopCh.
ctx, cancel := context.WithCancel(parent)
defer cancel()
@ -224,7 +223,7 @@ func (e *Exporter) Export(parent context.Context, resource *resource.Resource, c
}
}(ctx, cancel)
rms, err := transform.CheckpointSet(ctx, resource, cps, e.c.numWorkers)
rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers)
if err != nil {
return err
}

View File

@ -659,11 +659,10 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
equiv := r.resource.Equivalent()
resources[equiv] = r.resource
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, agg))
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg))
}
for equiv, records := range recs {
resource := resources[equiv]
assert.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: records}))
for _, records := range recs {
assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records}))
}
// assert.ElementsMatch does not equate nested slices of different order,
@ -713,8 +712,6 @@ func TestEmptyMetricExport(t *testing.T) {
exp.metricExporter = msc
exp.started = true
resource := resource.New(kv.String("R", "S"))
for _, test := range []struct {
records []metricsdk.Record
want []metricpb.ResourceMetrics
@ -729,7 +726,7 @@ func TestEmptyMetricExport(t *testing.T) {
},
} {
msc.Reset()
require.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: test.records}))
require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records}))
assert.Equal(t, test.want, msc.ResourceMetrics())
}
}

View File

@ -154,12 +154,9 @@ type Exporter interface {
// The Context comes from the controller that initiated
// collection.
//
// The Resource contains common attributes that apply to all
// metric events in the SDK.
//
// The CheckpointSet interface refers to the Integrator that just
// completed collection.
Export(context.Context, *resource.Resource, CheckpointSet) error
Export(context.Context, CheckpointSet) error
}
// CheckpointSet allows a controller to access a complete checkpoint of
@ -183,16 +180,18 @@ type CheckpointSet interface {
type Record struct {
descriptor *metric.Descriptor
labels *label.Set
resource *resource.Resource
aggregator Aggregator
}
// NewRecord allows Integrator implementations to construct export
// records. The Descriptor, Labels, and Aggregator represent
// aggregate metric events received over a single collection period.
func NewRecord(descriptor *metric.Descriptor, labels *label.Set, aggregator Aggregator) Record {
func NewRecord(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator Aggregator) Record {
return Record{
descriptor: descriptor,
labels: labels,
resource: resource,
aggregator: aggregator,
}
}
@ -213,3 +212,8 @@ func (r Record) Descriptor() *metric.Descriptor {
func (r Record) Labels() *label.Set {
return r.labels
}
// Resource contains common attributes that apply to this metric event.
func (r Record) Resource() *resource.Resource {
return r.resource
}

View File

@ -14,6 +14,8 @@
package metric
import "go.opentelemetry.io/otel/sdk/resource"
// Config contains configuration for an SDK.
type Config struct {
// ErrorHandler is the function called when the SDK encounters an error.
@ -21,6 +23,10 @@ type Config struct {
// This option can be overridden after instantiation of the SDK
// with the `SetErrorHandler` method.
ErrorHandler ErrorHandler
// Resource describes all the metric records processed by the
// Accumulator.
Resource *resource.Resource
}
// Option is the interface that applies the value to a configuration option.
@ -39,3 +45,16 @@ type errorHandlerOption ErrorHandler
func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = ErrorHandler(o)
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(res *resource.Resource) Option {
return resourceOption{res}
}
type resourceOption struct {
*resource.Resource
}
func (o resourceOption) Apply(config *Config) {
config.Resource = o.Resource
}

View File

@ -31,7 +31,7 @@ type Controller struct {
lock sync.Mutex
collectLock sync.Mutex
accumulator *sdk.Accumulator
resource *resource.Resource
provider *registry.Provider
errorHandler sdk.ErrorHandler
integrator export.Integrator
exporter export.Exporter
@ -40,7 +40,6 @@ type Controller struct {
period time.Duration
ticker Ticker
clock Clock
provider *registry.Provider
}
// Several types below are created to match "github.com/benbjohnson/clock"
@ -71,15 +70,17 @@ var _ Ticker = realTicker{}
// configuration options to configure an SDK with periodic collection.
// The integrator itself is configured with the aggregation selector policy.
func New(integrator export.Integrator, exporter export.Exporter, period time.Duration, opts ...Option) *Controller {
c := &Config{ErrorHandler: sdk.DefaultErrorHandler}
c := &Config{
ErrorHandler: sdk.DefaultErrorHandler,
Resource: resource.Empty(),
}
for _, opt := range opts {
opt.Apply(c)
}
impl := sdk.NewAccumulator(integrator, sdk.WithErrorHandler(c.ErrorHandler))
impl := sdk.NewAccumulator(integrator, sdk.WithErrorHandler(c.ErrorHandler), sdk.WithResource(c.Resource))
return &Controller{
accumulator: impl,
resource: c.Resource,
provider: registry.NewProvider(impl),
errorHandler: c.ErrorHandler,
integrator: integrator,
@ -166,7 +167,7 @@ func (c *Controller) tick() {
mtx: &c.collectLock,
delegate: c.integrator.CheckpointSet(),
}
err := c.exporter.Export(ctx, c.resource, checkpointSet)
err := c.exporter.Export(ctx, checkpointSet)
c.integrator.FinishedCollection()
if err != nil {

View File

@ -25,6 +25,8 @@ import (
"github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
@ -42,6 +44,8 @@ type testIntegrator struct {
finishes int
}
var testResource = resource.New(kv.String("R", "V"))
type testExporter struct {
t *testing.T
lock sync.Mutex
@ -68,7 +72,7 @@ var _ push.Clock = mockClock{}
var _ push.Ticker = mockTicker{}
func newFixture(t *testing.T) testFixture {
checkpointSet := test.NewCheckpointSet()
checkpointSet := test.NewCheckpointSet(testResource)
integrator := &testIntegrator{
t: t,
@ -115,7 +119,7 @@ func (b *testIntegrator) getCounts() (checkpoints, finishes int) {
return b.checkpoints, b.finishes
}
func (e *testExporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.lock.Lock()
defer e.lock.Unlock()
e.exports++
@ -213,6 +217,7 @@ func TestPushTicker(t *testing.T) {
require.Equal(t, 1, exports)
require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
sum, err := records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(3), sum.AsInt64())
@ -232,6 +237,7 @@ func TestPushTicker(t *testing.T) {
require.Equal(t, 2, exports)
require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
sum, err = records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(7), sum.AsInt64())
@ -256,8 +262,8 @@ func TestPushExportError(t *testing.T) {
expectedDescriptors []string
expectedError error
}{
{"errNone", nil, []string{"counter1", "counter2"}, nil},
{"errNoData", aggregator.ErrNoData, []string{"counter2"}, nil},
{"errNone", nil, []string{"counter1{R=V,X=Y}", "counter2{R=V,}"}, nil},
{"errNoData", aggregator.ErrNoData, []string{"counter2{R=V,}"}, nil},
{"errUnexpected", errAggregator, []string{}, errAggregator},
}
for _, tt := range tests {
@ -287,7 +293,7 @@ func TestPushExportError(t *testing.T) {
p.Start()
runtime.Gosched()
counter1.Add(ctx, 3)
counter1.Add(ctx, 3, kv.String("X", "Y"))
counter2.Add(ctx, 5)
require.Equal(t, 0, fix.exporter.exports)
@ -311,11 +317,16 @@ func TestPushExportError(t *testing.T) {
lock.Unlock()
require.Equal(t, len(tt.expectedDescriptors), len(records))
for _, r := range records {
require.Contains(t, tt.expectedDescriptors, r.Descriptor().Name())
require.Contains(t, tt.expectedDescriptors,
fmt.Sprintf("%s{%s,%s}",
r.Descriptor().Name(),
r.Resource().Encoded(label.DefaultEncoder()),
r.Labels().Encoded(label.DefaultEncoder()),
),
)
}
p.Stop()
})
}
}

View File

@ -33,9 +33,11 @@ import (
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
batchTest "go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/resource"
)
var Must = metric.Must
var testResource = resource.New(kv.String("R", "V"))
type correctnessIntegrator struct {
newAggCount int64
@ -45,6 +47,15 @@ type correctnessIntegrator struct {
records []export.Record
}
func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) {
integrator := &correctnessIntegrator{
t: t,
}
accum := metricsdk.NewAccumulator(integrator, metricsdk.WithResource(testResource))
meter := metric.WrapMeterImpl(accum, "test")
return meter, accum, integrator
}
func (cb *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
name := descriptor.Name()
@ -77,11 +88,7 @@ func (cb *correctnessIntegrator) Process(_ context.Context, record export.Record
func TestInputRangeTestCounter(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, integrator := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@ -109,11 +116,7 @@ func TestInputRangeTestCounter(t *testing.T) {
func TestInputRangeTestValueRecorder(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, integrator := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@ -144,11 +147,7 @@ func TestInputRangeTestValueRecorder(t *testing.T) {
func TestDisabledInstrument(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, integrator := newSDK(t)
valuerecorder := Must(meter).NewFloat64ValueRecorder("name.disabled")
@ -161,12 +160,7 @@ func TestDisabledInstrument(t *testing.T) {
func TestRecordNaN(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, _ := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@ -181,11 +175,7 @@ func TestRecordNaN(t *testing.T) {
func TestSDKLabelsDeduplication(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, integrator := newSDK(t)
counter := Must(meter).NewInt64Counter("counter")
@ -284,12 +274,7 @@ func TestDefaultLabelEncoder(t *testing.T) {
func TestObserverCollection(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, integrator := newSDK(t)
_ = Must(meter).RegisterFloat64ValueObserver("float.valueobserver", func(result metric.Float64ObserverResult) {
result.Observe(1, kv.String("A", "B"))
@ -317,21 +302,16 @@ func TestObserverCollection(t *testing.T) {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.valueobserver/A=B": -1,
"float.valueobserver/C=D": -1,
"int.valueobserver/": 1,
"int.valueobserver/A=B": 1,
"float.valueobserver/A=B/R=V": -1,
"float.valueobserver/C=D/R=V": -1,
"int.valueobserver//R=V": 1,
"int.valueobserver/A=B/R=V": 1,
}, out.Map)
}
func TestObserverBatch(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, integrator := newSDK(t)
var floatObs metric.Float64ValueObserver
var intObs metric.Int64ValueObserver
@ -371,21 +351,16 @@ func TestObserverBatch(t *testing.T) {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.valueobserver/A=B": -1,
"float.valueobserver/C=D": -1,
"int.valueobserver/": 1,
"int.valueobserver/A=B": 1,
"float.valueobserver/A=B/R=V": -1,
"float.valueobserver/C=D/R=V": -1,
"int.valueobserver//R=V": 1,
"int.valueobserver/A=B/R=V": 1,
}, out.Map)
}
func TestRecordBatch(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, integrator := newSDK(t)
counter1 := Must(meter).NewInt64Counter("int64.counter")
counter2 := Must(meter).NewFloat64Counter("float64.counter")
@ -411,10 +386,10 @@ func TestRecordBatch(t *testing.T) {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"int64.counter/A=B,C=D": 1,
"float64.counter/A=B,C=D": 2,
"int64.valuerecorder/A=B,C=D": 3,
"float64.valuerecorder/A=B,C=D": 4,
"int64.counter/A=B,C=D/R=V": 1,
"float64.counter/A=B,C=D/R=V": 2,
"int64.valuerecorder/A=B,C=D/R=V": 3,
"float64.valuerecorder/A=B,C=D/R=V": 4,
}, out.Map)
}
@ -423,12 +398,7 @@ func TestRecordBatch(t *testing.T) {
// that its encoded labels will be cached across collection intervals.
func TestRecordPersistence(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
meter, sdk, integrator := newSDK(t)
c := Must(meter).NewFloat64Counter("sum.name")
b := c.Bind(kv.String("bound", "true"))

View File

@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/resource"
)
type (
@ -34,11 +35,13 @@ type (
batchKey struct {
descriptor *metric.Descriptor
distinct label.Distinct
resource label.Distinct
}
batchValue struct {
aggregator export.Aggregator
labels *label.Set
resource *resource.Resource
}
batchMap map[batchKey]batchValue
@ -64,6 +67,7 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error {
key := batchKey{
descriptor: desc,
distinct: record.Labels().Equivalent(),
resource: record.Resource().Equivalent(),
}
agg := record.Aggregator()
value, ok := b.batchMap[key]
@ -91,6 +95,7 @@ func (b *Integrator) Process(_ context.Context, record export.Record) error {
b.batchMap[key] = batchValue{
aggregator: agg,
labels: record.Labels(),
resource: record.Resource(),
}
return nil
}
@ -110,6 +115,7 @@ func (c batchMap) ForEach(f func(export.Record) error) error {
if err := f(export.NewRecord(
key.descriptor,
value.labels,
value.resource,
value.aggregator,
)); err != nil && !errors.Is(err, aggregator.ErrNoData) {
return err

View File

@ -68,18 +68,18 @@ func TestUngroupedStateless(t *testing.T) {
// Output lastvalue should have only the "G=H" and "G=" keys.
// Output counter should have only the "C=D" and "C=" keys.
require.EqualValues(t, map[string]float64{
"sum.a/C~D&G~H": 60, // labels1
"sum.a/C~D&E~F": 20, // labels2
"sum.a/": 40, // labels3
"sum.b/C~D&G~H": 60, // labels1
"sum.b/C~D&E~F": 20, // labels2
"sum.b/": 40, // labels3
"lastvalue.a/C~D&G~H": 50, // labels1
"lastvalue.a/C~D&E~F": 20, // labels2
"lastvalue.a/": 30, // labels3
"lastvalue.b/C~D&G~H": 50, // labels1
"lastvalue.b/C~D&E~F": 20, // labels2
"lastvalue.b/": 30, // labels3
"sum.a/C~D&G~H/R~V": 60, // labels1
"sum.a/C~D&E~F/R~V": 20, // labels2
"sum.a//R~V": 40, // labels3
"sum.b/C~D&G~H/R~V": 60, // labels1
"sum.b/C~D&E~F/R~V": 20, // labels2
"sum.b//R~V": 40, // labels3
"lastvalue.a/C~D&G~H/R~V": 50, // labels1
"lastvalue.a/C~D&E~F/R~V": 20, // labels2
"lastvalue.a//R~V": 30, // labels3
"lastvalue.b/C~D&G~H/R~V": 50, // labels1
"lastvalue.b/C~D&E~F/R~V": 20, // labels2
"lastvalue.b//R~V": 30, // labels3
}, records.Map)
// Verify that state was reset
@ -110,8 +110,8 @@ func TestUngroupedStateful(t *testing.T) {
_ = checkpointSet.ForEach(records1.AddTo)
require.EqualValues(t, map[string]float64{
"sum.a/C~D&G~H": 10, // labels1
"sum.b/C~D&G~H": 10, // labels1
"sum.a/C~D&G~H/R~V": 10, // labels1
"sum.b/C~D&G~H/R~V": 10, // labels1
}, records1.Map)
// Test that state was NOT reset
@ -140,8 +140,8 @@ func TestUngroupedStateful(t *testing.T) {
require.EqualValues(t, records1.Map, records3.Map)
// Now process the second update
_ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, caggA))
_ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, caggB))
_ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA))
_ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB))
checkpointSet = b.CheckpointSet()
b.FinishedCollection()
@ -150,7 +150,7 @@ func TestUngroupedStateful(t *testing.T) {
_ = checkpointSet.ForEach(records4.AddTo)
require.EqualValues(t, map[string]float64{
"sum.a/C~D&G~H": 30,
"sum.b/C~D&G~H": 30,
"sum.a/C~D&G~H/R~V": 30,
"sum.b/C~D&G~H/R~V": 30,
}, records4.Map)
}

View File

@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
)
type (
@ -45,6 +46,9 @@ type (
)
var (
// Resource is applied to all test records built in this package.
Resource = resource.New(kv.String("R", "V"))
// LastValueADesc and LastValueBDesc group by "G"
LastValueADesc = metric.NewDescriptor(
"lastvalue.a", metric.ValueObserverKind, metric.Int64NumberKind)
@ -133,12 +137,12 @@ func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator {
// Convenience method for building a test exported lastValue record.
func NewLastValueRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record {
return export.NewRecord(desc, labels, LastValueAgg(desc, value))
return export.NewRecord(desc, labels, Resource, LastValueAgg(desc, value))
}
// Convenience method for building a test exported counter record.
func NewCounterRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record {
return export.NewRecord(desc, labels, CounterAgg(desc, value))
return export.NewRecord(desc, labels, Resource, CounterAgg(desc, value))
}
// CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value.
@ -154,7 +158,8 @@ func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator {
// value to the output map.
func (o Output) AddTo(rec export.Record) error {
encoded := rec.Labels().Encoded(o.labelEncoder)
key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded)
rencoded := rec.Resource().Encoded(o.labelEncoder)
key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded)
var value float64
if s, ok := rec.Aggregator().(aggregator.Sum); ok {

View File

@ -29,6 +29,7 @@ import (
internal "go.opentelemetry.io/otel/internal/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/resource"
)
type (
@ -68,6 +69,9 @@ type (
// place for sorting during labels creation to avoid
// allocation. It is cleared after use.
asyncSortSlice label.Sortable
// resource is applied to all records in this Accumulator.
resource *resource.Resource
}
syncInstrument struct {
@ -317,6 +321,7 @@ func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator {
integrator: integrator,
errorHandler: c.ErrorHandler,
asyncInstruments: internal.NewAsyncInstrumentState(c.ErrorHandler),
resource: c.Resource,
}
}
@ -472,7 +477,7 @@ func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descrip
}
recorder.Checkpoint(ctx, descriptor)
exportRecord := export.NewRecord(descriptor, labels, recorder)
exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder)
err := m.integrator.Process(ctx, exportRecord)
if err != nil {
m.errorHandler(err)