You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-09-16 09:26:25 +02:00
Pass Resources through the metrics export pipeline (#659)
This commit is contained in:
@@ -21,7 +21,6 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/unit"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
// Provider supports named Meter instances.
|
||||
@@ -38,8 +37,6 @@ type Config struct {
|
||||
Description string
|
||||
// Unit is an optional field describing the metric instrument.
|
||||
Unit unit.Unit
|
||||
// Resource describes the entity for which measurements are made.
|
||||
Resource *resource.Resource
|
||||
// LibraryName is the name given to the Meter that created
|
||||
// this instrument. See `Provider`.
|
||||
LibraryName string
|
||||
@@ -132,12 +129,6 @@ func (d Descriptor) NumberKind() core.NumberKind {
|
||||
return d.numberKind
|
||||
}
|
||||
|
||||
// Resource returns the Resource describing the entity for which the metric
|
||||
// instrument measures.
|
||||
func (d Descriptor) Resource() *resource.Resource {
|
||||
return d.config.Resource
|
||||
}
|
||||
|
||||
// LibraryName returns the metric instrument's library name, typically
|
||||
// given via a call to Provider.Meter().
|
||||
func (d Descriptor) LibraryName() string {
|
||||
@@ -200,19 +191,6 @@ func (u unitOption) Apply(config *Config) {
|
||||
config.Unit = unit.Unit(u)
|
||||
}
|
||||
|
||||
// WithResource applies provided Resource.
|
||||
//
|
||||
// This will override any existing Resource.
|
||||
func WithResource(r *resource.Resource) Option {
|
||||
return resourceOption{r}
|
||||
}
|
||||
|
||||
type resourceOption struct{ *resource.Resource }
|
||||
|
||||
func (r resourceOption) Apply(config *Config) {
|
||||
config.Resource = r.Resource
|
||||
}
|
||||
|
||||
// WithLibraryName applies provided library name. This is meant for
|
||||
// use in `Provider` implementations that have not used
|
||||
// `WrapMeterImpl`. Implementations built using `WrapMeterImpl` have
|
||||
|
@@ -25,7 +25,6 @@ import (
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/api/unit"
|
||||
mockTest "go.opentelemetry.io/otel/internal/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -36,28 +35,25 @@ var Must = metric.Must
|
||||
|
||||
func TestOptions(t *testing.T) {
|
||||
type testcase struct {
|
||||
name string
|
||||
opts []metric.Option
|
||||
desc string
|
||||
unit unit.Unit
|
||||
resource *resource.Resource
|
||||
name string
|
||||
opts []metric.Option
|
||||
desc string
|
||||
unit unit.Unit
|
||||
}
|
||||
testcases := []testcase{
|
||||
{
|
||||
name: "no opts",
|
||||
opts: nil,
|
||||
desc: "",
|
||||
unit: "",
|
||||
resource: nil,
|
||||
name: "no opts",
|
||||
opts: nil,
|
||||
desc: "",
|
||||
unit: "",
|
||||
},
|
||||
{
|
||||
name: "description",
|
||||
opts: []metric.Option{
|
||||
metric.WithDescription("stuff"),
|
||||
},
|
||||
desc: "stuff",
|
||||
unit: "",
|
||||
resource: nil,
|
||||
desc: "stuff",
|
||||
unit: "",
|
||||
},
|
||||
{
|
||||
name: "description override",
|
||||
@@ -65,18 +61,16 @@ func TestOptions(t *testing.T) {
|
||||
metric.WithDescription("stuff"),
|
||||
metric.WithDescription("things"),
|
||||
},
|
||||
desc: "things",
|
||||
unit: "",
|
||||
resource: nil,
|
||||
desc: "things",
|
||||
unit: "",
|
||||
},
|
||||
{
|
||||
name: "unit",
|
||||
opts: []metric.Option{
|
||||
metric.WithUnit("s"),
|
||||
},
|
||||
desc: "",
|
||||
unit: "s",
|
||||
resource: nil,
|
||||
desc: "",
|
||||
unit: "s",
|
||||
},
|
||||
{
|
||||
name: "unit override",
|
||||
@@ -84,18 +78,8 @@ func TestOptions(t *testing.T) {
|
||||
metric.WithUnit("s"),
|
||||
metric.WithUnit("h"),
|
||||
},
|
||||
desc: "",
|
||||
unit: "h",
|
||||
resource: nil,
|
||||
},
|
||||
{
|
||||
name: "resource override",
|
||||
opts: []metric.Option{
|
||||
metric.WithResource(resource.New(key.New("name").String("test-name"))),
|
||||
},
|
||||
desc: "",
|
||||
unit: "",
|
||||
resource: resource.New(key.New("name").String("test-name")),
|
||||
desc: "",
|
||||
unit: "h",
|
||||
},
|
||||
}
|
||||
for idx, tt := range testcases {
|
||||
@@ -103,7 +87,6 @@ func TestOptions(t *testing.T) {
|
||||
if diff := cmp.Diff(metric.Configure(tt.opts), metric.Config{
|
||||
Description: tt.desc,
|
||||
Unit: tt.unit,
|
||||
Resource: tt.resource,
|
||||
}); diff != "" {
|
||||
t.Errorf("Compare options: -got +want %s", diff)
|
||||
}
|
||||
|
@@ -18,7 +18,6 @@ import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
// MeterImpl is a convenient interface for SDK and test
|
||||
@@ -122,29 +121,6 @@ func Configure(opts []Option) Config {
|
||||
return config
|
||||
}
|
||||
|
||||
// Resourcer is implemented by any value that has a Resource method,
|
||||
// which returns the Resource associated with the value.
|
||||
// The Resource method is used to set the Resource for Descriptors of new
|
||||
// metric instruments.
|
||||
type Resourcer interface {
|
||||
Resource() *resource.Resource
|
||||
}
|
||||
|
||||
// insertResource inserts a WithResource option at the beginning of opts
|
||||
// using the resource defined by impl if impl implements Resourcer.
|
||||
//
|
||||
// If opts contains a WithResource option already, that Option will take
|
||||
// precedence and overwrite the Resource set from impl.
|
||||
//
|
||||
// The returned []Option may uses the same underlying array as opts.
|
||||
func insertResource(impl MeterImpl, opts []Option) []Option {
|
||||
if r, ok := impl.(Resourcer); ok {
|
||||
// default to the impl resource and override if passed in opts.
|
||||
return append([]Option{WithResource(r.Resource())}, opts...)
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
// WrapMeterImpl constructs a `Meter` implementation from a
|
||||
// `MeterImpl` implementation.
|
||||
func WrapMeterImpl(impl MeterImpl, libraryName string) Meter {
|
||||
@@ -159,7 +135,6 @@ func (m *wrappedMeterImpl) RecordBatch(ctx context.Context, ls []core.KeyValue,
|
||||
}
|
||||
|
||||
func (m *wrappedMeterImpl) newSync(name string, metricKind Kind, numberKind core.NumberKind, opts []Option) (SyncImpl, error) {
|
||||
opts = insertResource(m.impl, opts)
|
||||
desc := NewDescriptor(name, metricKind, numberKind, opts...)
|
||||
desc.config.LibraryName = m.libraryName
|
||||
return m.impl.NewSyncInstrument(desc)
|
||||
@@ -222,7 +197,6 @@ func WrapFloat64MeasureInstrument(syncInst SyncImpl, err error) (Float64Measure,
|
||||
}
|
||||
|
||||
func (m *wrappedMeterImpl) newAsync(name string, mkind Kind, nkind core.NumberKind, opts []Option, callback func(func(core.Number, []core.KeyValue))) (AsyncImpl, error) {
|
||||
opts = insertResource(m.impl, opts)
|
||||
desc := NewDescriptor(name, mkind, nkind, opts...)
|
||||
desc.config.LibraryName = m.libraryName
|
||||
return m.impl.NewAsyncInstrument(desc, callback)
|
||||
|
@@ -31,6 +31,7 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
// Exporter is an implementation of metric.Exporter that sends metrics to
|
||||
@@ -167,7 +168,8 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
|
||||
}
|
||||
|
||||
// Export exports the provide metric record to prometheus.
|
||||
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
|
||||
func (e *Exporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
|
||||
// TODO: Use the resource value in this exporter.
|
||||
e.snapshot = checkpointSet
|
||||
return nil
|
||||
}
|
||||
|
@@ -26,7 +26,6 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/key"
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/exporters/metric/prometheus"
|
||||
"go.opentelemetry.io/otel/exporters/metric/test"
|
||||
@@ -41,7 +40,7 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
}
|
||||
|
||||
var expected []string
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
counter := metric.NewDescriptor(
|
||||
"counter", metric.CounterKind, core.Float64NumberKind)
|
||||
@@ -119,7 +118,7 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
}
|
||||
|
||||
func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) {
|
||||
err := exporter.Export(context.Background(), checkpointSet)
|
||||
err := exporter.Export(context.Background(), nil, checkpointSet)
|
||||
require.Nil(t, err)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/global"
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
@@ -120,8 +121,8 @@ func NewRawExporter(config Config) (*Exporter, error) {
|
||||
// }
|
||||
// defer pipeline.Stop()
|
||||
// ... Done
|
||||
func InstallNewPipeline(config Config) (*push.Controller, error) {
|
||||
controller, err := NewExportPipeline(config, time.Minute)
|
||||
func InstallNewPipeline(config Config, opts ...push.Option) (*push.Controller, error) {
|
||||
controller, err := NewExportPipeline(config, time.Minute, opts...)
|
||||
if err != nil {
|
||||
return controller, err
|
||||
}
|
||||
@@ -131,26 +132,27 @@ func InstallNewPipeline(config Config) (*push.Controller, error) {
|
||||
|
||||
// NewExportPipeline sets up a complete export pipeline with the recommended setup,
|
||||
// chaining a NewRawExporter into the recommended selectors and batchers.
|
||||
func NewExportPipeline(config Config, period time.Duration) (*push.Controller, error) {
|
||||
func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) (*push.Controller, error) {
|
||||
selector := simple.NewWithExactMeasure()
|
||||
exporter, err := NewRawExporter(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
batcher := ungrouped.New(selector, true)
|
||||
pusher := push.New(batcher, exporter, period)
|
||||
pusher := push.New(batcher, exporter, period, opts...)
|
||||
pusher.Start()
|
||||
|
||||
return pusher, nil
|
||||
}
|
||||
|
||||
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
|
||||
func (e *Exporter) Export(_ context.Context, resource *resource.Resource, checkpointSet export.CheckpointSet) error {
|
||||
var aggError error
|
||||
var batch expoBatch
|
||||
if !e.config.DoNotPrintTime {
|
||||
ts := time.Now()
|
||||
batch.Timestamp = &ts
|
||||
}
|
||||
encodedResource := resource.Encoded(e.config.LabelEncoder)
|
||||
aggError = checkpointSet.ForEach(func(record export.Record) error {
|
||||
desc := record.Descriptor()
|
||||
agg := record.Aggregator()
|
||||
@@ -224,8 +226,12 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
|
||||
|
||||
sb.WriteString(desc.Name())
|
||||
|
||||
if len(encodedLabels) > 0 {
|
||||
if len(encodedLabels) > 0 || len(encodedResource) > 0 {
|
||||
sb.WriteRune('{')
|
||||
sb.WriteString(encodedResource)
|
||||
if len(encodedLabels) > 0 && len(encodedResource) > 0 {
|
||||
sb.WriteRune(',')
|
||||
}
|
||||
sb.WriteString(encodedLabels)
|
||||
sb.WriteRune('}')
|
||||
}
|
||||
|
@@ -26,7 +26,6 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/key"
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/exporters/metric/stdout"
|
||||
"go.opentelemetry.io/otel/exporters/metric/test"
|
||||
@@ -38,6 +37,7 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
aggtest "go.opentelemetry.io/otel/sdk/metric/aggregator/test"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
type testFixture struct {
|
||||
@@ -45,9 +45,10 @@ type testFixture struct {
|
||||
ctx context.Context
|
||||
exporter *stdout.Exporter
|
||||
output *bytes.Buffer
|
||||
resource *resource.Resource
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T, config stdout.Config) testFixture {
|
||||
func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) testFixture {
|
||||
buf := &bytes.Buffer{}
|
||||
config.Writer = buf
|
||||
config.DoNotPrintTime = true
|
||||
@@ -60,6 +61,7 @@ func newFixture(t *testing.T, config stdout.Config) testFixture {
|
||||
ctx: context.Background(),
|
||||
exporter: exp,
|
||||
output: buf,
|
||||
resource: resource,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,7 +70,7 @@ func (fix testFixture) Output() string {
|
||||
}
|
||||
|
||||
func (fix testFixture) Export(checkpointSet export.CheckpointSet) {
|
||||
err := fix.exporter.Export(fix.ctx, checkpointSet)
|
||||
err := fix.exporter.Export(fix.ctx, fix.resource, checkpointSet)
|
||||
if err != nil {
|
||||
fix.t.Error("export failed: ", err)
|
||||
}
|
||||
@@ -94,7 +96,7 @@ func TestStdoutTimestamp(t *testing.T) {
|
||||
|
||||
before := time.Now()
|
||||
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
ctx := context.Background()
|
||||
desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Int64NumberKind)
|
||||
@@ -104,7 +106,7 @@ func TestStdoutTimestamp(t *testing.T) {
|
||||
|
||||
checkpointSet.Add(&desc, lvagg)
|
||||
|
||||
if err := exporter.Export(ctx, checkpointSet); err != nil {
|
||||
if err := exporter.Export(ctx, nil, checkpointSet); err != nil {
|
||||
t.Fatal("Unexpected export error: ", err)
|
||||
}
|
||||
|
||||
@@ -138,9 +140,9 @@ func TestStdoutTimestamp(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStdoutCounterFormat(t *testing.T) {
|
||||
fix := newFixture(t, stdout.Config{})
|
||||
fix := newFixture(t, nil, stdout.Config{})
|
||||
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind)
|
||||
cagg := sum.New()
|
||||
@@ -155,9 +157,9 @@ func TestStdoutCounterFormat(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStdoutLastValueFormat(t *testing.T) {
|
||||
fix := newFixture(t, stdout.Config{})
|
||||
fix := newFixture(t, nil, stdout.Config{})
|
||||
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind)
|
||||
lvagg := lastvalue.New()
|
||||
@@ -172,9 +174,9 @@ func TestStdoutLastValueFormat(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStdoutMinMaxSumCount(t *testing.T) {
|
||||
fix := newFixture(t, stdout.Config{})
|
||||
fix := newFixture(t, nil, stdout.Config{})
|
||||
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
desc := metric.NewDescriptor("test.name", metric.MeasureKind, core.Float64NumberKind)
|
||||
magg := minmaxsumcount.New(&desc)
|
||||
@@ -190,11 +192,11 @@ func TestStdoutMinMaxSumCount(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStdoutMeasureFormat(t *testing.T) {
|
||||
fix := newFixture(t, stdout.Config{
|
||||
fix := newFixture(t, nil, stdout.Config{
|
||||
PrettyPrint: true,
|
||||
})
|
||||
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
desc := metric.NewDescriptor("test.name", metric.MeasureKind, core.Float64NumberKind)
|
||||
magg := array.New()
|
||||
@@ -246,9 +248,9 @@ func TestStdoutNoData(t *testing.T) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fix := newFixture(t, stdout.Config{})
|
||||
fix := newFixture(t, nil, stdout.Config{})
|
||||
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
magg := tc
|
||||
magg.Checkpoint(fix.ctx, &desc)
|
||||
@@ -263,9 +265,9 @@ func TestStdoutNoData(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStdoutLastValueNotSet(t *testing.T) {
|
||||
fix := newFixture(t, stdout.Config{})
|
||||
fix := newFixture(t, nil, stdout.Config{})
|
||||
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind)
|
||||
lvagg := lastvalue.New()
|
||||
@@ -277,3 +279,55 @@ func TestStdoutLastValueNotSet(t *testing.T) {
|
||||
|
||||
require.Equal(t, `{"updates":null}`, fix.Output())
|
||||
}
|
||||
|
||||
func TestStdoutResource(t *testing.T) {
|
||||
type testCase struct {
|
||||
expect string
|
||||
res *resource.Resource
|
||||
attrs []core.KeyValue
|
||||
}
|
||||
newCase := func(expect string, res *resource.Resource, attrs ...core.KeyValue) testCase {
|
||||
return testCase{
|
||||
expect: expect,
|
||||
res: res,
|
||||
attrs: attrs,
|
||||
}
|
||||
}
|
||||
testCases := []testCase{
|
||||
newCase("R1=V1,R2=V2,A=B,C=D",
|
||||
resource.New(key.String("R1", "V1"), key.String("R2", "V2")),
|
||||
key.String("A", "B"),
|
||||
key.String("C", "D")),
|
||||
newCase("R1=V1,R2=V2",
|
||||
resource.New(key.String("R1", "V1"), key.String("R2", "V2")),
|
||||
),
|
||||
newCase("A=B,C=D",
|
||||
nil,
|
||||
key.String("A", "B"),
|
||||
key.String("C", "D"),
|
||||
),
|
||||
// We explicitly do not de-duplicate between resources
|
||||
// and metric labels in this exporter.
|
||||
newCase("R1=V1,R2=V2,R1=V3,R2=V4",
|
||||
resource.New(key.String("R1", "V1"), key.String("R2", "V2")),
|
||||
key.String("R1", "V3"),
|
||||
key.String("R2", "V4")),
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
fix := newFixture(t, tc.res, stdout.Config{})
|
||||
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind)
|
||||
lvagg := lastvalue.New()
|
||||
aggtest.CheckedUpdate(fix.t, lvagg, core.NewFloat64Number(123.456), &desc)
|
||||
lvagg.Checkpoint(fix.ctx, &desc)
|
||||
|
||||
checkpointSet.Add(&desc, lvagg, tc.attrs...)
|
||||
|
||||
fix.Export(checkpointSet)
|
||||
|
||||
require.Equal(t, `{"updates":[{"name":"test.name{`+tc.expect+`}","last":123.456}]}`, fix.Output())
|
||||
}
|
||||
}
|
||||
|
@@ -29,23 +29,26 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
)
|
||||
|
||||
type mapkey struct {
|
||||
desc *metric.Descriptor
|
||||
distinct label.Distinct
|
||||
}
|
||||
|
||||
type CheckpointSet struct {
|
||||
encoder label.Encoder
|
||||
records map[string]export.Record
|
||||
records map[mapkey]export.Record
|
||||
updates []export.Record
|
||||
}
|
||||
|
||||
// NewCheckpointSet returns a test CheckpointSet that new records could be added.
|
||||
// Records are grouped by their encoded labels.
|
||||
func NewCheckpointSet(encoder label.Encoder) *CheckpointSet {
|
||||
func NewCheckpointSet() *CheckpointSet {
|
||||
return &CheckpointSet{
|
||||
encoder: encoder,
|
||||
records: make(map[string]export.Record),
|
||||
records: make(map[mapkey]export.Record),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *CheckpointSet) Reset() {
|
||||
p.records = make(map[string]export.Record)
|
||||
p.records = make(map[mapkey]export.Record)
|
||||
p.updates = nil
|
||||
}
|
||||
|
||||
@@ -56,7 +59,10 @@ func (p *CheckpointSet) Reset() {
|
||||
func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, labels ...core.KeyValue) (agg export.Aggregator, added bool) {
|
||||
elabels := label.NewSet(labels...)
|
||||
|
||||
key := desc.Name() + "_" + elabels.Encoded(p.encoder)
|
||||
key := mapkey{
|
||||
desc: desc,
|
||||
distinct: elabels.Equivalent(),
|
||||
}
|
||||
if record, ok := p.records[key]; ok {
|
||||
return record.Aggregator(), false
|
||||
}
|
||||
|
@@ -62,7 +62,7 @@ type result struct {
|
||||
|
||||
// CheckpointSet transforms all records contained in a checkpoint into
|
||||
// batched OTLP ResourceMetrics.
|
||||
func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
|
||||
func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
|
||||
records, errc := source(ctx, cps)
|
||||
|
||||
// Start a fixed number of goroutines to transform records.
|
||||
@@ -72,7 +72,7 @@ func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uin
|
||||
for i := uint(0); i < numWorkers; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
transformer(ctx, records, transformed)
|
||||
transformer(ctx, resource, records, transformed)
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
@@ -117,7 +117,7 @@ func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record
|
||||
|
||||
// transformer transforms records read from the passed in chan into
|
||||
// OTLP Metrics which are sent on the out chan.
|
||||
func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) {
|
||||
func transformer(ctx context.Context, resource *resource.Resource, in <-chan export.Record, out chan<- result) {
|
||||
for r := range in {
|
||||
m, err := Record(r)
|
||||
// Propagate errors, but do not send empty results.
|
||||
@@ -125,7 +125,7 @@ func transformer(ctx context.Context, in <-chan export.Record, out chan<- result
|
||||
continue
|
||||
}
|
||||
res := result{
|
||||
Resource: r.Descriptor().Resource(),
|
||||
Resource: resource,
|
||||
Library: r.Descriptor().LibraryName(),
|
||||
Metric: m,
|
||||
Err: err,
|
||||
|
@@ -31,6 +31,7 @@ import (
|
||||
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
|
||||
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
type Exporter struct {
|
||||
@@ -211,7 +212,7 @@ func (e *Exporter) Stop() error {
|
||||
// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
|
||||
// interface. It transforms and batches metric Records into OTLP Metrics and
|
||||
// transmits them to the configured collector.
|
||||
func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
|
||||
func (e *Exporter) Export(parent context.Context, resource *resource.Resource, cps metricsdk.CheckpointSet) error {
|
||||
// Unify the parent context Done signal with the exporter stopCh.
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
defer cancel()
|
||||
@@ -223,7 +224,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
|
||||
}
|
||||
}(ctx, cancel)
|
||||
|
||||
rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers)
|
||||
rms, err := transform.CheckpointSet(ctx, resource, cps, e.c.numWorkers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -74,11 +74,12 @@ func (m checkpointSet) ForEach(fn func(metricsdk.Record) error) error {
|
||||
}
|
||||
|
||||
type record struct {
|
||||
name string
|
||||
mKind metric.Kind
|
||||
nKind core.NumberKind
|
||||
opts []metric.Option
|
||||
labels []core.KeyValue
|
||||
name string
|
||||
mKind metric.Kind
|
||||
nKind core.NumberKind
|
||||
resource *resource.Resource
|
||||
opts []metric.Option
|
||||
labels []core.KeyValue
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -145,14 +146,16 @@ func TestNoGroupingExport(t *testing.T) {
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
[]metric.Option{},
|
||||
nil,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
},
|
||||
{
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
[]metric.Option{},
|
||||
nil,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(2)),
|
||||
},
|
||||
},
|
||||
@@ -191,7 +194,8 @@ func TestMeasureMetricGroupingExport(t *testing.T) {
|
||||
"measure",
|
||||
metric.MeasureKind,
|
||||
core.Int64NumberKind,
|
||||
[]metric.Option{},
|
||||
nil,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
}
|
||||
expected := []metricpb.ResourceMetrics{
|
||||
@@ -264,7 +268,8 @@ func TestCountInt64MetricGroupingExport(t *testing.T) {
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
[]metric.Option{},
|
||||
nil,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
}
|
||||
runMetricExportTests(
|
||||
@@ -300,7 +305,8 @@ func TestCountUint64MetricGroupingExport(t *testing.T) {
|
||||
"uint64-count",
|
||||
metric.CounterKind,
|
||||
core.Uint64NumberKind,
|
||||
[]metric.Option{},
|
||||
nil,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
}
|
||||
runMetricExportTests(
|
||||
@@ -349,7 +355,8 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) {
|
||||
"float64-count",
|
||||
metric.CounterKind,
|
||||
core.Float64NumberKind,
|
||||
[]metric.Option{},
|
||||
nil,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
}
|
||||
runMetricExportTests(
|
||||
@@ -401,28 +408,32 @@ func TestResourceMetricGroupingExport(t *testing.T) {
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
[]metric.Option{metric.WithResource(testInstA)},
|
||||
testInstA,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
},
|
||||
{
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
[]metric.Option{metric.WithResource(testInstA)},
|
||||
testInstA,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
},
|
||||
{
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
[]metric.Option{metric.WithResource(testInstA)},
|
||||
testInstA,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(2)),
|
||||
},
|
||||
{
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
[]metric.Option{metric.WithResource(testInstB)},
|
||||
testInstB,
|
||||
nil,
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
},
|
||||
},
|
||||
@@ -484,8 +495,8 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
testInstA,
|
||||
[]metric.Option{
|
||||
metric.WithResource(testInstA),
|
||||
metric.WithLibraryName("couting-lib"),
|
||||
},
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
@@ -494,8 +505,8 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
testInstA,
|
||||
[]metric.Option{
|
||||
metric.WithResource(testInstA),
|
||||
metric.WithLibraryName("couting-lib"),
|
||||
},
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
@@ -504,8 +515,8 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
testInstA,
|
||||
[]metric.Option{
|
||||
metric.WithResource(testInstA),
|
||||
metric.WithLibraryName("couting-lib"),
|
||||
},
|
||||
append(baseKeyValues, cpuKey.Int(2)),
|
||||
@@ -514,8 +525,8 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
testInstA,
|
||||
[]metric.Option{
|
||||
metric.WithResource(testInstA),
|
||||
metric.WithLibraryName("summing-lib"),
|
||||
},
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
@@ -524,8 +535,8 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
"int64-count",
|
||||
metric.CounterKind,
|
||||
core.Int64NumberKind,
|
||||
testInstB,
|
||||
[]metric.Option{
|
||||
metric.WithResource(testInstB),
|
||||
metric.WithLibraryName("couting-lib"),
|
||||
},
|
||||
append(baseKeyValues, cpuKey.Int(1)),
|
||||
@@ -617,7 +628,8 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
|
||||
exp.metricExporter = msc
|
||||
exp.started = true
|
||||
|
||||
var recs []metricsdk.Record
|
||||
recs := map[label.Distinct][]metricsdk.Record{}
|
||||
resources := map[label.Distinct]*resource.Resource{}
|
||||
for _, r := range rs {
|
||||
desc := metric.NewDescriptor(r.name, r.mKind, r.nKind, r.opts...)
|
||||
labs := label.NewSet(r.labels...)
|
||||
@@ -646,9 +658,14 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
|
||||
}
|
||||
agg.Checkpoint(ctx, &desc)
|
||||
|
||||
recs = append(recs, metricsdk.NewRecord(&desc, &labs, agg))
|
||||
equiv := r.resource.Equivalent()
|
||||
resources[equiv] = r.resource
|
||||
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, agg))
|
||||
}
|
||||
for equiv, records := range recs {
|
||||
resource := resources[equiv]
|
||||
assert.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: records}))
|
||||
}
|
||||
assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: recs}))
|
||||
|
||||
// assert.ElementsMatch does not equate nested slices of different order,
|
||||
// therefore this requires the top level slice to be broken down.
|
||||
@@ -697,6 +714,8 @@ func TestEmptyMetricExport(t *testing.T) {
|
||||
exp.metricExporter = msc
|
||||
exp.started = true
|
||||
|
||||
resource := resource.New(key.String("R", "S"))
|
||||
|
||||
for _, test := range []struct {
|
||||
records []metricsdk.Record
|
||||
want []metricpb.ResourceMetrics
|
||||
@@ -711,7 +730,7 @@ func TestEmptyMetricExport(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
msc.Reset()
|
||||
require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records}))
|
||||
require.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: test.records}))
|
||||
assert.Equal(t, test.want, msc.ResourceMetrics())
|
||||
}
|
||||
}
|
||||
|
@@ -20,6 +20,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
// Batcher is responsible for deciding which kind of aggregation to
|
||||
@@ -160,9 +161,12 @@ type Exporter interface {
|
||||
// The Context comes from the controller that initiated
|
||||
// collection.
|
||||
//
|
||||
// The Resource contains common attributes that apply to all
|
||||
// metric events in the SDK.
|
||||
//
|
||||
// The CheckpointSet interface refers to the Batcher that just
|
||||
// completed collection.
|
||||
Export(context.Context, CheckpointSet) error
|
||||
Export(context.Context, *resource.Resource, CheckpointSet) error
|
||||
}
|
||||
|
||||
// CheckpointSet allows a controller to access a complete checkpoint of
|
||||
|
@@ -14,8 +14,6 @@
|
||||
|
||||
package metric
|
||||
|
||||
import "go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
// Config contains configuration for an SDK.
|
||||
type Config struct {
|
||||
// ErrorHandler is the function called when the SDK encounters an error.
|
||||
@@ -23,10 +21,6 @@ type Config struct {
|
||||
// This option can be overridden after instantiation of the SDK
|
||||
// with the `SetErrorHandler` method.
|
||||
ErrorHandler ErrorHandler
|
||||
|
||||
// Resource is the OpenTelemetry resource associated with all Meters
|
||||
// created by the SDK.
|
||||
Resource *resource.Resource
|
||||
}
|
||||
|
||||
// Option is the interface that applies the value to a configuration option.
|
||||
@@ -45,14 +39,3 @@ type errorHandlerOption ErrorHandler
|
||||
func (o errorHandlerOption) Apply(config *Config) {
|
||||
config.ErrorHandler = ErrorHandler(o)
|
||||
}
|
||||
|
||||
// WithResource sets the Resource configuration option of a Config.
|
||||
func WithResource(r *resource.Resource) Option {
|
||||
return resourceOption{r}
|
||||
}
|
||||
|
||||
type resourceOption struct{ *resource.Resource }
|
||||
|
||||
func (o resourceOption) Apply(config *Config) {
|
||||
config.Resource = o.Resource
|
||||
}
|
||||
|
@@ -19,9 +19,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"go.opentelemetry.io/otel/api/key"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
func TestWithErrorHandler(t *testing.T) {
|
||||
@@ -46,16 +43,3 @@ func TestWithErrorHandler(t *testing.T) {
|
||||
c.ErrorHandler(err2)
|
||||
assert.EqualError(t, *reg, err2.Error())
|
||||
}
|
||||
|
||||
func TestWithResource(t *testing.T) {
|
||||
r := resource.New(key.String("A", "a"))
|
||||
|
||||
c := &Config{}
|
||||
WithResource(r).Apply(c)
|
||||
assert.True(t, r.Equal(c.Resource))
|
||||
|
||||
// Ensure overwriting works.
|
||||
c = &Config{Resource: &resource.Resource{}}
|
||||
WithResource(r).Apply(c)
|
||||
assert.Equal(t, r.Equivalent(), c.Resource.Equivalent())
|
||||
}
|
||||
|
@@ -23,6 +23,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/metric/registry"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
sdk "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
// Controller organizes a periodic push of metric data.
|
||||
@@ -30,6 +31,7 @@ type Controller struct {
|
||||
lock sync.Mutex
|
||||
collectLock sync.Mutex
|
||||
sdk *sdk.SDK
|
||||
resource *resource.Resource
|
||||
uniq metric.MeterImpl
|
||||
named map[string]metric.Meter
|
||||
errorHandler sdk.ErrorHandler
|
||||
@@ -77,9 +79,10 @@ func New(batcher export.Batcher, exporter export.Exporter, period time.Duration,
|
||||
opt.Apply(c)
|
||||
}
|
||||
|
||||
impl := sdk.New(batcher, sdk.WithResource(c.Resource), sdk.WithErrorHandler(c.ErrorHandler))
|
||||
impl := sdk.New(batcher, sdk.WithErrorHandler(c.ErrorHandler))
|
||||
return &Controller{
|
||||
sdk: impl,
|
||||
resource: c.Resource,
|
||||
uniq: registry.NewUniqueInstrumentMeterImpl(impl),
|
||||
named: map[string]metric.Meter{},
|
||||
errorHandler: c.ErrorHandler,
|
||||
@@ -175,7 +178,7 @@ func (c *Controller) tick() {
|
||||
mtx: &c.collectLock,
|
||||
delegate: c.batcher.CheckpointSet(),
|
||||
}
|
||||
err := c.exporter.Export(ctx, checkpointSet)
|
||||
err := c.exporter.Export(ctx, c.resource, checkpointSet)
|
||||
c.batcher.FinishedCollection()
|
||||
|
||||
if err != nil {
|
||||
|
@@ -25,13 +25,13 @@ import (
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/exporters/metric/test"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
type testBatcher struct {
|
||||
@@ -68,7 +68,7 @@ var _ push.Clock = mockClock{}
|
||||
var _ push.Ticker = mockTicker{}
|
||||
|
||||
func newFixture(t *testing.T) testFixture {
|
||||
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
|
||||
checkpointSet := test.NewCheckpointSet()
|
||||
|
||||
batcher := &testBatcher{
|
||||
t: t,
|
||||
@@ -115,7 +115,7 @@ func (b *testBatcher) getCounts() (checkpoints, finishes int) {
|
||||
return b.checkpoints, b.finishes
|
||||
}
|
||||
|
||||
func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
|
||||
func (e *testExporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
e.exports++
|
||||
|
@@ -28,7 +28,6 @@ import (
|
||||
api "go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -61,9 +60,6 @@ type (
|
||||
// errorHandler supports delivering errors to the user.
|
||||
errorHandler ErrorHandler
|
||||
|
||||
// resource represents the entity producing telemetry.
|
||||
resource *resource.Resource
|
||||
|
||||
// asyncSortSlice has a single purpose - as a temporary
|
||||
// place for sorting during labels creation to avoid
|
||||
// allocation. It is cleared after use.
|
||||
@@ -323,7 +319,6 @@ func New(batcher export.Batcher, opts ...Option) *SDK {
|
||||
return &SDK{
|
||||
batcher: batcher,
|
||||
errorHandler: c.ErrorHandler,
|
||||
resource: c.Resource,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -467,16 +462,6 @@ func (m *SDK) checkpoint(ctx context.Context, descriptor *metric.Descriptor, rec
|
||||
return 1
|
||||
}
|
||||
|
||||
// Resource returns the Resource this SDK was created with describing the
|
||||
// entity for which it creates instruments for.
|
||||
//
|
||||
// Resource means that the SDK implements the Resourcer interface and
|
||||
// therefore all metric instruments it creates will inherit its
|
||||
// Resource by default unless explicitly overwritten.
|
||||
func (m *SDK) Resource() *resource.Resource {
|
||||
return m.resource
|
||||
}
|
||||
|
||||
// RecordBatch enters a batch of metric events.
|
||||
func (m *SDK) RecordBatch(ctx context.Context, kvs []core.KeyValue, measurements ...api.Measurement) {
|
||||
// Labels will be computed the first time acquireHandle is
|
||||
|
@@ -133,3 +133,13 @@ func (r *Resource) Len() int {
|
||||
}
|
||||
return r.labels.Len()
|
||||
}
|
||||
|
||||
// Encoded returns an encoded representation of the resource by
|
||||
// applying a label encoder. The result is cached by the underlying
|
||||
// label set.
|
||||
func (r *Resource) Encoded(enc label.Encoder) string {
|
||||
if r == nil {
|
||||
return ""
|
||||
}
|
||||
return r.labels.Encoded(enc)
|
||||
}
|
||||
|
Reference in New Issue
Block a user