1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Remove Labelset (#595)

* Remove LabelSet frmo api/metric

* SDK tests pass

* Restore benchmarks

* All tests pass

* Remove all mentions of LabelSet

* Test RecordBatch

* Batch test

* Improves benchmark (some)

* Move the benchmark to match HEAD

* Align labels for GOARCH=386

* Add alignment test

* Disable the stress test fo GOARCH=386

* Fix bug

* Move atomic fields into their own file

* Add a TODO

* Comments

* Remove metric.Labels(...)

* FTB

Co-authored-by: Liz Fong-Jones <lizf@honeycomb.io>
This commit is contained in:
Joshua MacDonald
2020-03-27 14:06:48 -07:00
committed by GitHub
parent e7a9ba1e2e
commit e8546e3bc5
29 changed files with 436 additions and 477 deletions

View File

@@ -19,6 +19,7 @@ import (
"strings"
"testing"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/global/internal"
"go.opentelemetry.io/otel/api/key"
@@ -90,13 +91,13 @@ func BenchmarkGlobalInt64CounterAddNoSDK(b *testing.B) {
internal.ResetForTest()
ctx := context.Background()
sdk := global.Meter("test")
labs := sdk.Labels(key.String("A", "B"))
labs := []core.KeyValue{key.String("A", "B")}
cnt := Must(sdk).NewInt64Counter("int64.counter")
b.ResetTimer()
for i := 0; i < b.N; i++ {
cnt.Add(ctx, 1, labs)
cnt.Add(ctx, 1, labs...)
}
}
@@ -109,13 +110,13 @@ func BenchmarkGlobalInt64CounterAddWithSDK(b *testing.B) {
global.SetMeterProvider(fix)
labs := sdk.Labels(key.String("A", "B"))
labs := []core.KeyValue{key.String("A", "B")}
cnt := Must(sdk).NewInt64Counter("int64.counter")
b.ResetTimer()
for i := 0; i < b.N; i++ {
cnt.Add(ctx, 1, labs)
cnt.Add(ctx, 1, labs...)
}
}

View File

@@ -39,9 +39,6 @@ import (
// provider. Mutexes in the Provider and Meters ensure that each
// instrument has a delegate before the global provider is set.
//
// LabelSets are implemented by delegating to the Meter instance using
// the metric.LabelSetDelegator interface.
//
// Bound instrument operations are implemented by delegating to the
// instrument after it is registered, with a sync.Once initializer to
// protect against races with Release().
@@ -100,28 +97,17 @@ type AsyncImpler interface {
AsyncImpl() metric.AsyncImpl
}
type labelSet struct {
delegate unsafe.Pointer // (* metric.LabelSet)
meter *meter
value []core.KeyValue
initialize sync.Once
}
type syncHandle struct {
delegate unsafe.Pointer // (*metric.HandleImpl)
inst *syncImpl
labels metric.LabelSet
labels []core.KeyValue
initialize sync.Once
}
var _ metric.Provider = &meterProvider{}
var _ metric.Meter = &meter{}
var _ metric.LabelSet = &labelSet{}
var _ metric.LabelSetDelegate = &labelSet{}
var _ metric.InstrumentImpl = &syncImpl{}
var _ metric.BoundSyncImpl = &syncHandle{}
var _ metric.AsyncImpl = &asyncImpl{}
@@ -254,7 +240,7 @@ func (inst *syncImpl) Implementation() interface{} {
return inst
}
func (inst *syncImpl) Bind(labels metric.LabelSet) metric.BoundSyncImpl {
func (inst *syncImpl) Bind(labels []core.KeyValue) metric.BoundSyncImpl {
if implPtr := (*metric.SyncImpl)(atomic.LoadPointer(&inst.delegate)); implPtr != nil {
return (*implPtr).Bind(labels)
}
@@ -340,13 +326,13 @@ func (obs *asyncImpl) setDelegate(d metric.Meter) {
// Metric updates
func (m *meter) RecordBatch(ctx context.Context, labels metric.LabelSet, measurements ...metric.Measurement) {
func (m *meter) RecordBatch(ctx context.Context, labels []core.KeyValue, measurements ...metric.Measurement) {
if delegatePtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); delegatePtr != nil {
(*delegatePtr).RecordBatch(ctx, labels, measurements...)
}
}
func (inst *syncImpl) RecordOne(ctx context.Context, number core.Number, labels metric.LabelSet) {
func (inst *syncImpl) RecordOne(ctx context.Context, number core.Number, labels []core.KeyValue) {
if instPtr := (*metric.SyncImpl)(atomic.LoadPointer(&inst.delegate)); instPtr != nil {
(*instPtr).RecordOne(ctx, number, labels)
}
@@ -377,35 +363,6 @@ func (bound *syncHandle) RecordOne(ctx context.Context, number core.Number) {
(*implPtr).RecordOne(ctx, number)
}
// LabelSet initialization
func (m *meter) Labels(labels ...core.KeyValue) metric.LabelSet {
return &labelSet{
meter: m,
value: labels,
}
}
func (labels *labelSet) Delegate() metric.LabelSet {
meterPtr := (*metric.Meter)(atomic.LoadPointer(&labels.meter.delegate))
if meterPtr == nil {
// This is technically impossible, provided the global
// Meter is updated after the meters and instruments
// have been delegated.
return labels
}
var implPtr *metric.LabelSet
labels.initialize.Do(func() {
implPtr = new(metric.LabelSet)
*implPtr = (*meterPtr).Labels(labels.value...)
atomic.StorePointer(&labels.delegate, unsafe.Pointer(implPtr))
})
if implPtr == nil {
implPtr = (*metric.LabelSet)(atomic.LoadPointer(&labels.delegate))
}
return (*implPtr)
}
// Constructors
func (m *meter) withName(opts []metric.Option) []metric.Option {
@@ -466,7 +423,6 @@ func AtomicFieldOffsets() map[string]uintptr {
"meter.delegate": unsafe.Offsetof(meter{}.delegate),
"syncImpl.delegate": unsafe.Offsetof(syncImpl{}.delegate),
"asyncImpl.delegate": unsafe.Offsetof(asyncImpl{}.delegate),
"labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate),
"syncHandle.delegate": unsafe.Offsetof(syncHandle{}.delegate),
}
}

View File

@@ -47,7 +47,7 @@ func asStructs(batches []metrictest.Batch) []measured {
r = append(r, measured{
Name: m.Instrument.Descriptor().Name(),
LibraryName: m.Instrument.Descriptor().LibraryName(),
Labels: batch.LabelSet.Labels,
Labels: asMap(batch.Labels...),
Number: m.Number,
})
}
@@ -72,41 +72,38 @@ func TestDirect(t *testing.T) {
ctx := context.Background()
meter1 := global.Meter("test1")
meter2 := global.Meter("test2")
lvals1 := key.String("A", "B")
labels1 := meter1.Labels(lvals1)
lvals2 := key.String("C", "D")
labels2 := meter1.Labels(lvals2)
lvals3 := key.String("E", "F")
labels3 := meter2.Labels(lvals3)
labels1 := []core.KeyValue{key.String("A", "B")}
labels2 := []core.KeyValue{key.String("C", "D")}
labels3 := []core.KeyValue{key.String("E", "F")}
counter := Must(meter1).NewInt64Counter("test.counter")
counter.Add(ctx, 1, labels1)
counter.Add(ctx, 1, labels1)
counter.Add(ctx, 1, labels1...)
counter.Add(ctx, 1, labels1...)
measure := Must(meter1).NewFloat64Measure("test.measure")
measure.Record(ctx, 1, labels1)
measure.Record(ctx, 2, labels1)
measure.Record(ctx, 1, labels1...)
measure.Record(ctx, 2, labels1...)
_ = Must(meter1).RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) {
result.Observe(1., labels1)
result.Observe(2., labels2)
result.Observe(1., labels1...)
result.Observe(2., labels2...)
})
_ = Must(meter1).RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) {
result.Observe(1, labels1)
result.Observe(2, labels2)
result.Observe(1, labels1...)
result.Observe(2, labels2...)
})
second := Must(meter2).NewFloat64Measure("test.second")
second.Record(ctx, 1, labels3)
second.Record(ctx, 2, labels3)
second.Record(ctx, 1, labels3...)
second.Record(ctx, 2, labels3...)
mock, provider := metrictest.NewProvider()
global.SetMeterProvider(provider)
counter.Add(ctx, 1, labels1)
measure.Record(ctx, 3, labels1)
second.Record(ctx, 3, labels3)
counter.Add(ctx, 1, labels1...)
measure.Record(ctx, 3, labels1...)
second.Record(ctx, 3, labels3...)
mock.RunAsyncInstruments()
@@ -117,43 +114,43 @@ func TestDirect(t *testing.T) {
{
Name: "test.counter",
LibraryName: "test1",
Labels: asMap(lvals1),
Labels: asMap(labels1...),
Number: asInt(1),
},
{
Name: "test.measure",
LibraryName: "test1",
Labels: asMap(lvals1),
Labels: asMap(labels1...),
Number: asFloat(3),
},
{
Name: "test.second",
LibraryName: "test2",
Labels: asMap(lvals3),
Labels: asMap(labels3...),
Number: asFloat(3),
},
{
Name: "test.observer.float",
LibraryName: "test1",
Labels: asMap(lvals1),
Labels: asMap(labels1...),
Number: asFloat(1),
},
{
Name: "test.observer.float",
LibraryName: "test1",
Labels: asMap(lvals2),
Labels: asMap(labels2...),
Number: asFloat(2),
},
{
Name: "test.observer.int",
LibraryName: "test1",
Labels: asMap(lvals1),
Labels: asMap(labels1...),
Number: asInt(1),
},
{
Name: "test.observer.int",
LibraryName: "test1",
Labels: asMap(lvals2),
Labels: asMap(labels2...),
Number: asInt(2),
},
},
@@ -168,16 +165,15 @@ func TestBound(t *testing.T) {
// vs. the above, to cover all the instruments.
ctx := context.Background()
glob := global.Meter("test")
lvals1 := key.String("A", "B")
labels1 := glob.Labels(lvals1)
labels1 := []core.KeyValue{key.String("A", "B")}
counter := Must(glob).NewFloat64Counter("test.counter")
boundC := counter.Bind(labels1)
boundC := counter.Bind(labels1...)
boundC.Add(ctx, 1)
boundC.Add(ctx, 1)
measure := Must(glob).NewInt64Measure("test.measure")
boundM := measure.Bind(labels1)
boundM := measure.Bind(labels1...)
boundM.Record(ctx, 1)
boundM.Record(ctx, 2)
@@ -192,13 +188,13 @@ func TestBound(t *testing.T) {
{
Name: "test.counter",
LibraryName: "test",
Labels: asMap(lvals1),
Labels: asMap(labels1...),
Number: asFloat(1),
},
{
Name: "test.measure",
LibraryName: "test",
Labels: asMap(lvals1),
Labels: asMap(labels1...),
Number: asInt(3),
},
},
@@ -213,14 +209,13 @@ func TestUnbind(t *testing.T) {
internal.ResetForTest()
glob := global.Meter("test")
lvals1 := key.New("A").String("B")
labels1 := glob.Labels(lvals1)
labels1 := []core.KeyValue{key.String("A", "B")}
counter := Must(glob).NewFloat64Counter("test.counter")
boundC := counter.Bind(labels1)
boundC := counter.Bind(labels1...)
measure := Must(glob).NewInt64Measure("test.measure")
boundM := measure.Bind(labels1)
boundM := measure.Bind(labels1...)
boundC.Unbind()
boundM.Unbind()
@@ -231,12 +226,11 @@ func TestDefaultSDK(t *testing.T) {
ctx := context.Background()
meter1 := global.Meter("builtin")
lvals1 := key.String("A", "B")
labels1 := meter1.Labels(lvals1)
labels1 := []core.KeyValue{key.String("A", "B")}
counter := Must(meter1).NewInt64Counter("test.builtin")
counter.Add(ctx, 1, labels1)
counter.Add(ctx, 1, labels1)
counter.Add(ctx, 1, labels1...)
counter.Add(ctx, 1, labels1...)
in, out := io.Pipe()
pusher, err := stdout.InstallNewPipeline(stdout.Config{
@@ -247,7 +241,7 @@ func TestDefaultSDK(t *testing.T) {
panic(err)
}
counter.Add(ctx, 1, labels1)
counter.Add(ctx, 1, labels1...)
ch := make(chan string)
go func() {
@@ -270,7 +264,7 @@ func TestUnbindThenRecordOne(t *testing.T) {
meter := global.Meter("test")
counter := Must(meter).NewInt64Counter("test.counter")
boundC := counter.Bind(meter.Labels())
boundC := counter.Bind()
global.SetMeterProvider(provider)
boundC.Unbind()
@@ -312,8 +306,8 @@ func TestErrorInDeferredConstructor(t *testing.T) {
global.SetMeterProvider(sdk)
})
c1.Add(ctx, 1, meter.Labels())
c2.Add(ctx, 2, meter.Labels())
c1.Add(ctx, 1)
c2.Add(ctx, 2)
}
func TestImplementationIndirection(t *testing.T) {

View File

@@ -31,11 +31,6 @@ type Provider interface {
Meter(name string) Meter
}
// LabelSet is an implementation-level interface that represents a
// []core.KeyValue for use as pre-defined labels in the metrics API.
type LabelSet interface {
}
// Config contains some options for metrics of any kind.
type Config struct {
// Description is an optional field describing the metric
@@ -161,12 +156,8 @@ func (d Descriptor) LibraryName() string {
// Meter is an interface to the metrics portion of the OpenTelemetry SDK.
type Meter interface {
// Labels returns a reference to a set of labels that cannot
// be read by the application.
Labels(...core.KeyValue) LabelSet
// RecordBatch atomically records a batch of measurements.
RecordBatch(context.Context, LabelSet, ...Measurement)
RecordBatch(context.Context, []core.KeyValue, ...Measurement)
// All instrument constructors may return an error for
// conditions such as:

View File

@@ -139,9 +139,9 @@ func TestCounter(t *testing.T) {
mockSDK, meter := mockTest.NewMeter()
c := Must(meter).NewFloat64Counter("test.counter.float")
ctx := context.Background()
labels := meter.Labels()
c.Add(ctx, 42, labels)
boundInstrument := c.Bind(labels)
labels := []core.KeyValue{key.String("A", "B")}
c.Add(ctx, 42, labels...)
boundInstrument := c.Bind(labels...)
boundInstrument.Add(ctx, 42)
meter.RecordBatch(ctx, labels, c.Measurement(42))
t.Log("Testing float counter")
@@ -151,9 +151,9 @@ func TestCounter(t *testing.T) {
mockSDK, meter := mockTest.NewMeter()
c := Must(meter).NewInt64Counter("test.counter.int")
ctx := context.Background()
labels := meter.Labels()
c.Add(ctx, 42, labels)
boundInstrument := c.Bind(labels)
labels := []core.KeyValue{key.String("A", "B"), key.String("C", "D")}
c.Add(ctx, 42, labels...)
boundInstrument := c.Bind(labels...)
boundInstrument.Add(ctx, 42)
meter.RecordBatch(ctx, labels, c.Measurement(42))
t.Log("Testing int counter")
@@ -166,9 +166,9 @@ func TestMeasure(t *testing.T) {
mockSDK, meter := mockTest.NewMeter()
m := Must(meter).NewFloat64Measure("test.measure.float")
ctx := context.Background()
labels := meter.Labels()
m.Record(ctx, 42, labels)
boundInstrument := m.Bind(labels)
labels := []core.KeyValue{}
m.Record(ctx, 42, labels...)
boundInstrument := m.Bind(labels...)
boundInstrument.Record(ctx, 42)
meter.RecordBatch(ctx, labels, m.Measurement(42))
t.Log("Testing float measure")
@@ -178,9 +178,9 @@ func TestMeasure(t *testing.T) {
mockSDK, meter := mockTest.NewMeter()
m := Must(meter).NewInt64Measure("test.measure.int")
ctx := context.Background()
labels := meter.Labels()
m.Record(ctx, 42, labels)
boundInstrument := m.Bind(labels)
labels := []core.KeyValue{key.Int("I", 1)}
m.Record(ctx, 42, labels...)
boundInstrument := m.Bind(labels...)
boundInstrument.Record(ctx, 42)
meter.RecordBatch(ctx, labels, m.Measurement(42))
t.Log("Testing int measure")
@@ -190,10 +190,10 @@ func TestMeasure(t *testing.T) {
func TestObserver(t *testing.T) {
{
labels := []core.KeyValue{key.String("O", "P")}
mockSDK, meter := mockTest.NewMeter()
labels := meter.Labels()
o := Must(meter).RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) {
result.Observe(42, labels)
result.Observe(42, labels...)
})
t.Log("Testing float observer")
@@ -201,10 +201,10 @@ func TestObserver(t *testing.T) {
checkObserverBatch(t, labels, mockSDK, core.Float64NumberKind, o.AsyncImpl())
}
{
labels := []core.KeyValue{}
mockSDK, meter := mockTest.NewMeter()
labels := meter.Labels()
o := Must(meter).RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) {
result.Observe(42, labels)
result.Observe(42, labels...)
})
t.Log("Testing int observer")
mockSDK.RunAsyncInstruments()
@@ -212,30 +212,21 @@ func TestObserver(t *testing.T) {
}
}
func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, mock *mockTest.MeterImpl, kind core.NumberKind, instrument metric.InstrumentImpl) {
func checkBatches(t *testing.T, ctx context.Context, labels []core.KeyValue, mock *mockTest.MeterImpl, kind core.NumberKind, instrument metric.InstrumentImpl) {
t.Helper()
if len(mock.MeasurementBatches) != 3 {
t.Errorf("Expected 3 recorded measurement batches, got %d", len(mock.MeasurementBatches))
}
ourInstrument := instrument.Implementation().(*mockTest.Sync)
ourLabelSet := labels.(*mockTest.LabelSet)
minLen := 3
if minLen > len(mock.MeasurementBatches) {
minLen = len(mock.MeasurementBatches)
}
for i := 0; i < minLen; i++ {
got := mock.MeasurementBatches[i]
for i, got := range mock.MeasurementBatches {
if got.Ctx != ctx {
d := func(c context.Context) string {
return fmt.Sprintf("(ptr: %p, ctx %#v)", c, c)
}
t.Errorf("Wrong recorded context in batch %d, expected %s, got %s", i, d(ctx), d(got.Ctx))
}
if got.LabelSet != ourLabelSet {
d := func(l *mockTest.LabelSet) string {
return fmt.Sprintf("(ptr: %p, labels %#v)", l, l.Labels)
}
t.Errorf("Wrong recorded label set in batch %d, expected %s, got %s", i, d(ourLabelSet), d(got.LabelSet))
if !assert.Equal(t, got.Labels, labels) {
t.Errorf("Wrong recorded label set in batch %d, expected %v, got %v", i, labels, got.Labels)
}
if len(got.Measurements) != 1 {
t.Errorf("Expected 1 measurement in batch %d, got %d", i, len(got.Measurements))
@@ -261,7 +252,7 @@ func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, moc
}
}
func checkObserverBatch(t *testing.T, labels metric.LabelSet, mock *mockTest.MeterImpl, kind core.NumberKind, observer metric.AsyncImpl) {
func checkObserverBatch(t *testing.T, labels []core.KeyValue, mock *mockTest.MeterImpl, kind core.NumberKind, observer metric.AsyncImpl) {
t.Helper()
assert.Len(t, mock.MeasurementBatches, 1)
if len(mock.MeasurementBatches) < 1 {
@@ -271,9 +262,8 @@ func checkObserverBatch(t *testing.T, labels metric.LabelSet, mock *mockTest.Met
if !assert.NotNil(t, o) {
return
}
ourLabelSet := labels.(*mockTest.LabelSet)
got := mock.MeasurementBatches[0]
assert.Equal(t, ourLabelSet, got.LabelSet)
assert.Equal(t, labels, got.Labels)
assert.Len(t, got.Measurements, 1)
if len(got.Measurements) < 1 {
return
@@ -301,18 +291,14 @@ type testWrappedMeter struct {
var _ metric.MeterImpl = testWrappedMeter{}
func (testWrappedMeter) Labels(...core.KeyValue) metric.LabelSet {
return nil
}
func (testWrappedMeter) RecordBatch(context.Context, metric.LabelSet, ...metric.Measurement) {
func (testWrappedMeter) RecordBatch(context.Context, []core.KeyValue, ...metric.Measurement) {
}
func (testWrappedMeter) NewSyncInstrument(_ metric.Descriptor) (metric.SyncImpl, error) {
return nil, nil
}
func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ func(func(core.Number, metric.LabelSet))) (metric.AsyncImpl, error) {
func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ func(func(core.Number, []core.KeyValue))) (metric.AsyncImpl, error) {
return nil, errors.New("Test wrap error")
}

View File

@@ -35,7 +35,7 @@ type asyncInstrument struct {
var ErrSDKReturnedNilImpl = errors.New("SDK returned a nil implementation")
func (s syncInstrument) bind(labels LabelSet) syncBoundInstrument {
func (s syncInstrument) bind(labels []core.KeyValue) syncBoundInstrument {
return newSyncBoundInstrument(s.instrument.Bind(labels))
}
@@ -47,7 +47,7 @@ func (s syncInstrument) int64Measurement(value int64) Measurement {
return newMeasurement(s.instrument, core.NewInt64Number(value))
}
func (s syncInstrument) directRecord(ctx context.Context, number core.Number, labels LabelSet) {
func (s syncInstrument) directRecord(ctx context.Context, number core.Number, labels []core.KeyValue) {
s.instrument.RecordOne(ctx, number, labels)
}

View File

@@ -51,7 +51,7 @@ type BoundInt64Counter struct {
// If the labels do not contain a value for the key specified in the
// counter with the WithKeys option, then the missing value will be
// treated as unspecified.
func (c Float64Counter) Bind(labels LabelSet) (h BoundFloat64Counter) {
func (c Float64Counter) Bind(labels ...core.KeyValue) (h BoundFloat64Counter) {
h.syncBoundInstrument = c.bind(labels)
return
}
@@ -63,7 +63,7 @@ func (c Float64Counter) Bind(labels LabelSet) (h BoundFloat64Counter) {
// If the labels do not contain a value for the key specified in the
// counter with the WithKeys option, then the missing value will be
// treated as unspecified.
func (c Int64Counter) Bind(labels LabelSet) (h BoundInt64Counter) {
func (c Int64Counter) Bind(labels ...core.KeyValue) (h BoundInt64Counter) {
h.syncBoundInstrument = c.bind(labels)
return
}
@@ -87,7 +87,7 @@ func (c Int64Counter) Measurement(value int64) Measurement {
// If the labels do not contain a value for the key specified in the
// counter with the WithKeys option, then the missing value will be
// treated as unspecified.
func (c Float64Counter) Add(ctx context.Context, value float64, labels LabelSet) {
func (c Float64Counter) Add(ctx context.Context, value float64, labels ...core.KeyValue) {
c.directRecord(ctx, core.NewFloat64Number(value), labels)
}
@@ -98,7 +98,7 @@ func (c Float64Counter) Add(ctx context.Context, value float64, labels LabelSet)
// If the labels do not contain a value for the key specified in the
// counter with the WithKeys option, then the missing value will be
// treated as unspecified.
func (c Int64Counter) Add(ctx context.Context, value int64, labels LabelSet) {
func (c Int64Counter) Add(ctx context.Context, value int64, labels ...core.KeyValue) {
c.directRecord(ctx, core.NewInt64Number(value), labels)
}

View File

@@ -26,11 +26,7 @@
// The primary object that handles metrics is Meter. Meter can be
// obtained from Provider. The implementations of the Meter and
// Provider are provided by SDK. Normally, the Meter is used directly
// only for the instrument creation, LabelSet generation and batch
// recording.
//
// LabelSet is a set of keys and values that are in a suitable,
// optimized form to be used by Meter.
// only for the instrument creation and batch recording.
//
// Counters are instruments that are reporting a quantity or a sum. An
// example could be bank account balance or bytes downloaded. Counters

View File

@@ -51,7 +51,7 @@ type BoundInt64Measure struct {
// If the labels do not contain a value for the key specified in the
// measure with the WithKeys option, then the missing value will be
// treated as unspecified.
func (c Float64Measure) Bind(labels LabelSet) (h BoundFloat64Measure) {
func (c Float64Measure) Bind(labels ...core.KeyValue) (h BoundFloat64Measure) {
h.syncBoundInstrument = c.bind(labels)
return
}
@@ -63,7 +63,7 @@ func (c Float64Measure) Bind(labels LabelSet) (h BoundFloat64Measure) {
// If the labels do not contain a value for the key specified in the
// measure with the WithKeys option, then the missing value will be
// treated as unspecified.
func (c Int64Measure) Bind(labels LabelSet) (h BoundInt64Measure) {
func (c Int64Measure) Bind(labels ...core.KeyValue) (h BoundInt64Measure) {
h.syncBoundInstrument = c.bind(labels)
return
}
@@ -87,7 +87,7 @@ func (c Int64Measure) Measurement(value int64) Measurement {
// If the labels do not contain a value for the key specified in the
// measure with the WithKeys option, then the missing value will be
// treated as unspecified.
func (c Float64Measure) Record(ctx context.Context, value float64, labels LabelSet) {
func (c Float64Measure) Record(ctx context.Context, value float64, labels ...core.KeyValue) {
c.directRecord(ctx, core.NewFloat64Number(value), labels)
}
@@ -98,7 +98,7 @@ func (c Float64Measure) Record(ctx context.Context, value float64, labels LabelS
// If the labels do not contain a value for the key specified in the
// measure with the WithKeys option, then the missing value will be
// treated as unspecified.
func (c Int64Measure) Record(ctx context.Context, value int64, labels LabelSet) {
func (c Int64Measure) Record(ctx context.Context, value int64, labels ...core.KeyValue) {
c.directRecord(ctx, core.NewInt64Number(value), labels)
}

View File

@@ -23,7 +23,6 @@ import (
type NoopProvider struct{}
type NoopMeter struct{}
type noopLabelSet struct{}
type noopInstrument struct{}
type noopBoundInstrument struct{}
type NoopSync struct{ noopInstrument }
@@ -33,7 +32,6 @@ var _ Provider = NoopProvider{}
var _ Meter = NoopMeter{}
var _ SyncImpl = NoopSync{}
var _ BoundSyncImpl = noopBoundInstrument{}
var _ LabelSet = noopLabelSet{}
var _ AsyncImpl = NoopAsync{}
func (NoopProvider) Meter(name string) Meter {
@@ -54,18 +52,14 @@ func (noopBoundInstrument) RecordOne(context.Context, core.Number) {
func (noopBoundInstrument) Unbind() {
}
func (NoopSync) Bind(LabelSet) BoundSyncImpl {
func (NoopSync) Bind([]core.KeyValue) BoundSyncImpl {
return noopBoundInstrument{}
}
func (NoopSync) RecordOne(context.Context, core.Number, LabelSet) {
func (NoopSync) RecordOne(context.Context, core.Number, []core.KeyValue) {
}
func (NoopMeter) Labels(...core.KeyValue) LabelSet {
return noopLabelSet{}
}
func (NoopMeter) RecordBatch(context.Context, LabelSet, ...Measurement) {
func (NoopMeter) RecordBatch(context.Context, []core.KeyValue, ...Measurement) {
}
func (NoopMeter) NewInt64Counter(string, ...Option) (Int64Counter, error) {

View File

@@ -14,16 +14,18 @@
package metric
import "go.opentelemetry.io/otel/api/core"
// Int64ObserverResult is an interface for reporting integral
// observations.
type Int64ObserverResult interface {
Observe(value int64, labels LabelSet)
Observe(value int64, labels ...core.KeyValue)
}
// Float64ObserverResult is an interface for reporting floating point
// observations.
type Float64ObserverResult interface {
Observe(value float64, labels LabelSet)
Observe(value float64, labels ...core.KeyValue)
}
// Int64ObserverCallback is a type of callback that integral

View File

@@ -53,13 +53,8 @@ func NewUniqueInstrumentMeterImpl(impl metric.MeterImpl) metric.MeterImpl {
}
}
// Labels implements metric.MeterImpl.
func (u *uniqueInstrumentMeterImpl) Labels(kvs ...core.KeyValue) metric.LabelSet {
return u.impl.Labels(kvs...)
}
// RecordBatch implements metric.MeterImpl.
func (u *uniqueInstrumentMeterImpl) RecordBatch(ctx context.Context, labels metric.LabelSet, ms ...metric.Measurement) {
func (u *uniqueInstrumentMeterImpl) RecordBatch(ctx context.Context, labels []core.KeyValue, ms ...metric.Measurement) {
u.impl.RecordBatch(ctx, labels, ms...)
}
@@ -130,7 +125,7 @@ func (u *uniqueInstrumentMeterImpl) NewSyncInstrument(descriptor metric.Descript
// NewAsyncInstrument implements metric.MeterImpl.
func (u *uniqueInstrumentMeterImpl) NewAsyncInstrument(
descriptor metric.Descriptor,
callback func(func(core.Number, metric.LabelSet)),
callback func(func(core.Number, []core.KeyValue)),
) (metric.AsyncImpl, error) {
u.lock.Lock()
defer u.lock.Unlock()

View File

@@ -26,12 +26,8 @@ import (
// re-implement the API's type-safe interfaces. Helpers provided in
// this package will construct a `Meter` given a `MeterImpl`.
type MeterImpl interface {
// Labels returns a reference to a set of labels that cannot
// be read by the application.
Labels(...core.KeyValue) LabelSet
// RecordBatch atomically records a batch of measurements.
RecordBatch(context.Context, LabelSet, ...Measurement)
RecordBatch(context.Context, []core.KeyValue, ...Measurement)
// NewSyncInstrument returns a newly constructed
// synchronous instrument implementation or an error, should
@@ -43,19 +39,10 @@ type MeterImpl interface {
// one occur.
NewAsyncInstrument(
descriptor Descriptor,
callback func(func(core.Number, LabelSet)),
callback func(func(core.Number, []core.KeyValue)),
) (AsyncImpl, error)
}
// LabelSetDelegate is a general-purpose delegating implementation of
// the LabelSet interface. This is implemented by the default
// Provider returned by api/global.SetMeterProvider(), and should be
// tested for by implementations before converting a LabelSet to their
// private concrete type.
type LabelSetDelegate interface {
Delegate() LabelSet
}
// InstrumentImpl is a common interface for synchronous and
// asynchronous instruments.
type InstrumentImpl interface {
@@ -75,10 +62,10 @@ type SyncImpl interface {
// Bind creates an implementation-level bound instrument,
// binding a label set with this instrument implementation.
Bind(labels LabelSet) BoundSyncImpl
Bind(labels []core.KeyValue) BoundSyncImpl
// RecordOne captures a single synchronous metric event.
RecordOne(ctx context.Context, number core.Number, labels LabelSet)
RecordOne(ctx context.Context, number core.Number, labels []core.KeyValue)
}
// BoundSyncImpl is the implementation-level interface to a
@@ -111,13 +98,13 @@ type wrappedMeterImpl struct {
// int64ObserverResult is an adapter for int64-valued asynchronous
// callbacks.
type int64ObserverResult struct {
observe func(core.Number, LabelSet)
observe func(core.Number, []core.KeyValue)
}
// float64ObserverResult is an adapter for float64-valued asynchronous
// callbacks.
type float64ObserverResult struct {
observe func(core.Number, LabelSet)
observe func(core.Number, []core.KeyValue)
}
var (
@@ -167,11 +154,7 @@ func WrapMeterImpl(impl MeterImpl, libraryName string) Meter {
}
}
func (m *wrappedMeterImpl) Labels(labels ...core.KeyValue) LabelSet {
return m.impl.Labels(labels...)
}
func (m *wrappedMeterImpl) RecordBatch(ctx context.Context, ls LabelSet, ms ...Measurement) {
func (m *wrappedMeterImpl) RecordBatch(ctx context.Context, ls []core.KeyValue, ms ...Measurement) {
m.impl.RecordBatch(ctx, ls, ms...)
}
@@ -238,7 +221,7 @@ func WrapFloat64MeasureInstrument(syncInst SyncImpl, err error) (Float64Measure,
return Float64Measure{syncInstrument: common}, err
}
func (m *wrappedMeterImpl) newAsync(name string, mkind Kind, nkind core.NumberKind, opts []Option, callback func(func(core.Number, LabelSet))) (AsyncImpl, error) {
func (m *wrappedMeterImpl) newAsync(name string, mkind Kind, nkind core.NumberKind, opts []Option, callback func(func(core.Number, []core.KeyValue))) (AsyncImpl, error) {
opts = insertResource(m.impl, opts)
desc := NewDescriptor(name, mkind, nkind, opts...)
desc.config.LibraryName = m.libraryName
@@ -251,7 +234,7 @@ func (m *wrappedMeterImpl) RegisterInt64Observer(name string, callback Int64Obse
}
return WrapInt64ObserverInstrument(
m.newAsync(name, ObserverKind, core.Int64NumberKind, opts,
func(observe func(core.Number, LabelSet)) {
func(observe func(core.Number, []core.KeyValue)) {
// Note: this memory allocation could be avoided by
// using a pointer to this object and mutating it
// on each collection interval.
@@ -274,7 +257,7 @@ func (m *wrappedMeterImpl) RegisterFloat64Observer(name string, callback Float64
}
return WrapFloat64ObserverInstrument(
m.newAsync(name, ObserverKind, core.Float64NumberKind, opts,
func(observe func(core.Number, LabelSet)) {
func(observe func(core.Number, []core.KeyValue)) {
callback(float64ObserverResult{observe})
}))
}
@@ -288,10 +271,10 @@ func WrapFloat64ObserverInstrument(asyncInst AsyncImpl, err error) (Float64Obser
return Float64Observer{asyncInstrument: common}, err
}
func (io int64ObserverResult) Observe(value int64, labels LabelSet) {
func (io int64ObserverResult) Observe(value int64, labels ...core.KeyValue) {
io.observe(core.NewInt64Number(value), labels)
}
func (fo float64ObserverResult) Observe(value float64, labels LabelSet) {
func (fo float64ObserverResult) Observe(value float64, labels ...core.KeyValue) {
fo.observe(core.NewFloat64Number(value), labels)
}

View File

@@ -72,10 +72,10 @@ func main() {
tracer := global.Tracer("ex.com/basic")
meter := global.Meter("ex.com/basic")
commonLabels := meter.Labels(lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3"))
commonLabels := []core.KeyValue{lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3")}
oneMetricCB := func(result metric.Float64ObserverResult) {
result.Observe(1, commonLabels)
result.Observe(1, commonLabels...)
}
_ = metric.Must(meter).RegisterFloat64Observer("ex.com.one", oneMetricCB,
metric.WithKeys(fooKey, barKey, lemonsKey),
@@ -91,7 +91,7 @@ func main() {
barKey.String("bar1"),
)
measure := measureTwo.Bind(commonLabels)
measure := measureTwo.Bind(commonLabels...)
defer measure.Unbind()
err := tracer.WithSpan(ctx, "operation", func(ctx context.Context) error {

View File

@@ -21,6 +21,7 @@ import (
"sync"
"time"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
@@ -53,13 +54,13 @@ func main() {
meter := global.Meter("ex.com/basic")
observerLock := new(sync.RWMutex)
observerValueToReport := new(float64)
observerLabelSetToReport := new(metric.LabelSet)
observerLabelsToReport := new([]core.KeyValue)
cb := func(result metric.Float64ObserverResult) {
(*observerLock).RLock()
value := *observerValueToReport
labelset := *observerLabelSetToReport
labels := *observerLabelsToReport
(*observerLock).RUnlock()
result.Observe(value, labelset)
result.Observe(value, labels...)
}
_ = metric.Must(meter).RegisterFloat64Observer("ex.com.one", cb,
metric.WithKeys(fooKey, barKey, lemonsKey),
@@ -69,14 +70,14 @@ func main() {
measureTwo := metric.Must(meter).NewFloat64Measure("ex.com.two", metric.WithKeys(key.New("A")))
measureThree := metric.Must(meter).NewFloat64Counter("ex.com.three")
commonLabels := meter.Labels(lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3"))
notSoCommonLabels := meter.Labels(lemonsKey.Int(13))
commonLabels := []core.KeyValue{lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3")}
notSoCommonLabels := []core.KeyValue{lemonsKey.Int(13)}
ctx := context.Background()
(*observerLock).Lock()
*observerValueToReport = 1.0
*observerLabelSetToReport = &commonLabels
*observerLabelsToReport = commonLabels
(*observerLock).Unlock()
meter.RecordBatch(
ctx,
@@ -89,7 +90,7 @@ func main() {
(*observerLock).Lock()
*observerValueToReport = 1.0
*observerLabelSetToReport = &notSoCommonLabels
*observerLabelsToReport = notSoCommonLabels
(*observerLock).Unlock()
meter.RecordBatch(
ctx,
@@ -102,7 +103,7 @@ func main() {
(*observerLock).Lock()
*observerValueToReport = 13.0
*observerLabelSetToReport = &commonLabels
*observerLabelsToReport = commonLabels
(*observerLock).Unlock()
meter.RecordBatch(
ctx,

View File

@@ -287,9 +287,9 @@ func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.D
ch <- m
}
func (c *collector) toDesc(metric *export.Record) *prometheus.Desc {
desc := metric.Descriptor()
labels := labelsKeys(metric.Labels())
func (c *collector) toDesc(record *export.Record) *prometheus.Desc {
desc := record.Descriptor()
labels := labelsKeys(record.Labels())
return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labels, nil)
}

View File

@@ -19,6 +19,7 @@ import (
"log"
"time"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/stdout"
@@ -42,9 +43,9 @@ func ExampleNewExportPipeline() {
// Create and update a single counter:
counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key))
labels := meter.Labels(key.String("value"))
labels := []core.KeyValue{key.String("value")}
counter.Add(ctx, 100, labels)
counter.Add(ctx, 100, labels...)
// Output:
// {

View File

@@ -34,7 +34,7 @@ type CheckpointSet struct {
}
// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their LabelSet.
// Records are grouped by their encoded labels.
func NewCheckpointSet(encoder export.LabelEncoder) *CheckpointSet {
return &CheckpointSet{
encoder: encoder,
@@ -49,7 +49,7 @@ func (p *CheckpointSet) Reset() {
// Add a new descriptor to a Checkpoint.
//
// If there is an existing record with the same descriptor and LabelSet
// If there is an existing record with the same descriptor and labels,
// the stored aggregator will be returned and should be merged.
func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, labels ...core.KeyValue) (agg export.Aggregator, added bool) {
elabels := export.NewSimpleLabels(p.encoder, labels...)

View File

@@ -27,6 +27,7 @@ import (
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
metricapi "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/otlp"
@@ -117,7 +118,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
ctx := context.Background()
meter := pusher.Meter("test-meter")
labels := meter.Labels(core.Key("test").Bool(true))
labels := []core.KeyValue{key.Bool("test", true)}
type data struct {
iKind metric.Kind
@@ -137,18 +138,18 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
case metric.CounterKind:
switch data.nKind {
case core.Int64NumberKind:
metricapi.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels)
metricapi.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels...)
case core.Float64NumberKind:
metricapi.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels)
metricapi.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels...)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
case metric.MeasureKind:
switch data.nKind {
case core.Int64NumberKind:
metricapi.Must(meter).NewInt64Measure(name).Record(ctx, data.val, labels)
metricapi.Must(meter).NewInt64Measure(name).Record(ctx, data.val, labels...)
case core.Float64NumberKind:
metricapi.Must(meter).NewFloat64Measure(name).Record(ctx, float64(data.val), labels)
metricapi.Must(meter).NewFloat64Measure(name).Record(ctx, float64(data.val), labels...)
default:
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
}
@@ -156,12 +157,12 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
switch data.nKind {
case core.Int64NumberKind:
callback := func(v int64) metricapi.Int64ObserverCallback {
return metricapi.Int64ObserverCallback(func(result metricapi.Int64ObserverResult) { result.Observe(v, labels) })
return metricapi.Int64ObserverCallback(func(result metricapi.Int64ObserverResult) { result.Observe(v, labels...) })
}(data.val)
metricapi.Must(meter).RegisterInt64Observer(name, callback)
case core.Float64NumberKind:
callback := func(v float64) metricapi.Float64ObserverCallback {
return metricapi.Float64ObserverCallback(func(result metricapi.Float64ObserverResult) { result.Observe(v, labels) })
return metricapi.Float64ObserverCallback(func(result metricapi.Float64ObserverResult) { result.Observe(v, labels...) })
}(float64(data.val))
metricapi.Must(meter).RegisterFloat64Observer(name, callback)
default:

View File

@@ -27,19 +27,14 @@ import (
type (
Handle struct {
Instrument *Sync
LabelSet *LabelSet
}
LabelSet struct {
Impl *MeterImpl
Labels map[core.Key]core.Value
Labels []core.KeyValue
}
Batch struct {
// Measurement needs to be aligned for 64-bit atomic operations.
Measurements []Measurement
Ctx context.Context
LabelSet *LabelSet
Labels []core.KeyValue
LibraryName string
}
@@ -69,7 +64,7 @@ type (
Async struct {
Instrument
callback func(func(core.Number, apimetric.LabelSet))
callback func(func(core.Number, []core.KeyValue))
}
Sync struct {
@@ -80,7 +75,6 @@ type (
var (
_ apimetric.SyncImpl = &Sync{}
_ apimetric.BoundSyncImpl = &Handle{}
_ apimetric.LabelSet = &LabelSet{}
_ apimetric.MeterImpl = &MeterImpl{}
_ apimetric.AsyncImpl = &Async{}
)
@@ -97,32 +91,26 @@ func (s *Sync) Implementation() interface{} {
return s
}
func (s *Sync) Bind(labels apimetric.LabelSet) apimetric.BoundSyncImpl {
if ld, ok := labels.(apimetric.LabelSetDelegate); ok {
labels = ld.Delegate()
}
func (s *Sync) Bind(labels []core.KeyValue) apimetric.BoundSyncImpl {
return &Handle{
Instrument: s,
LabelSet: labels.(*LabelSet),
Labels: labels,
}
}
func (s *Sync) RecordOne(ctx context.Context, number core.Number, labels apimetric.LabelSet) {
if ld, ok := labels.(apimetric.LabelSetDelegate); ok {
labels = ld.Delegate()
}
s.meter.doRecordSingle(ctx, labels.(*LabelSet), s, number)
func (s *Sync) RecordOne(ctx context.Context, number core.Number, labels []core.KeyValue) {
s.meter.doRecordSingle(ctx, labels, s, number)
}
func (h *Handle) RecordOne(ctx context.Context, number core.Number) {
h.Instrument.meter.doRecordSingle(ctx, h.LabelSet, h.Instrument, number)
h.Instrument.meter.doRecordSingle(ctx, h.Labels, h.Instrument, number)
}
func (h *Handle) Unbind() {
}
func (m *MeterImpl) doRecordSingle(ctx context.Context, labelSet *LabelSet, instrument apimetric.InstrumentImpl, number core.Number) {
m.recordMockBatch(ctx, labelSet, Measurement{
func (m *MeterImpl) doRecordSingle(ctx context.Context, labels []core.KeyValue, instrument apimetric.InstrumentImpl, number core.Number) {
m.recordMockBatch(ctx, labels, Measurement{
Instrument: instrument,
Number: number,
})
@@ -155,17 +143,6 @@ func NewMeter() (*MeterImpl, apimetric.Meter) {
return impl, p.Meter("mock")
}
func (m *MeterImpl) Labels(labels ...core.KeyValue) apimetric.LabelSet {
ul := make(map[core.Key]core.Value)
for _, kv := range labels {
ul[kv.Key] = kv.Value
}
return &LabelSet{
Impl: m,
Labels: ul,
}
}
func (m *MeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.SyncImpl, error) {
return &Sync{
Instrument{
@@ -175,7 +152,7 @@ func (m *MeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.S
}, nil
}
func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, callback func(func(core.Number, apimetric.LabelSet))) (apimetric.AsyncImpl, error) {
func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, callback func(func(core.Number, []core.KeyValue))) (apimetric.AsyncImpl, error) {
a := &Async{
Instrument: Instrument{
descriptor: descriptor,
@@ -187,8 +164,7 @@ func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, callback fu
return a, nil
}
func (m *MeterImpl) RecordBatch(ctx context.Context, labels apimetric.LabelSet, measurements ...apimetric.Measurement) {
ourLabelSet := labels.(*LabelSet)
func (m *MeterImpl) RecordBatch(ctx context.Context, labels []core.KeyValue, measurements ...apimetric.Measurement) {
mm := make([]Measurement, len(measurements))
for i := 0; i < len(measurements); i++ {
m := measurements[i]
@@ -197,26 +173,21 @@ func (m *MeterImpl) RecordBatch(ctx context.Context, labels apimetric.LabelSet,
Number: m.Number(),
}
}
m.recordMockBatch(ctx, ourLabelSet, mm...)
m.recordMockBatch(ctx, labels, mm...)
}
func (m *MeterImpl) recordMockBatch(ctx context.Context, labelSet *LabelSet, measurements ...Measurement) {
func (m *MeterImpl) recordMockBatch(ctx context.Context, labels []core.KeyValue, measurements ...Measurement) {
m.MeasurementBatches = append(m.MeasurementBatches, Batch{
Ctx: ctx,
LabelSet: labelSet,
Labels: labels,
Measurements: measurements,
})
}
func (m *MeterImpl) RunAsyncInstruments() {
for _, observer := range m.AsyncInstruments {
observer.callback(func(n core.Number, labels apimetric.LabelSet) {
if ld, ok := labels.(apimetric.LabelSetDelegate); ok {
labels = ld.Delegate()
}
m.doRecordSingle(context.Background(), labels.(*LabelSet), observer, n)
observer.callback(func(n core.Number, labels []core.KeyValue) {
m.doRecordSingle(context.Background(), labels, observer, n)
})
}
}

View File

@@ -17,24 +17,21 @@ package metric
import (
"os"
"testing"
"unsafe"
ottest "go.opentelemetry.io/otel/internal/testing"
)
// Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "record.refMapped.value",
Offset: unsafe.Offsetof(record{}.refMapped.value),
},
{
Name: "record.modified",
Offset: unsafe.Offsetof(record{}.modified),
},
offsets := AtomicFieldOffsets()
var r []ottest.FieldOffset
for name, offset := range offsets {
r = append(r, ottest.FieldOffset{
Name: name,
Offset: offset,
})
}
if !ottest.Aligned8Byte(fields, os.Stderr) {
if !ottest.Aligned8Byte(r, os.Stderr) {
os.Exit(1)
}

View File

@@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import "unsafe"
func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"record.refMapped.value": unsafe.Offsetof(record{}.refMapped.value),
"record.modified": unsafe.Offsetof(record{}.modified),
"record.labels.cachedEncoderID": unsafe.Offsetof(record{}.labels.cachedEncoded),
}
}

View File

@@ -97,7 +97,7 @@ func (b *Batcher) Process(_ context.Context, record export.Record) error {
// Note also the possibility to speed this computation of
// "encoded" via "outputLabels" in the form of a (Descriptor,
// LabelSet)->(Labels, Encoded) cache.
// Labels)->(Labels, Encoded) cache.
iter := record.Labels().Iter()
for iter.Next() {
kv := iter.Label()

View File

@@ -89,7 +89,7 @@ func (*benchFixture) CheckpointSet() export.CheckpointSet {
func (*benchFixture) FinishedCollection() {
}
func makeLabelSets(n int) [][]core.KeyValue {
func makeManyLabels(n int) [][]core.KeyValue {
r := make([][]core.KeyValue, n)
for i := 0; i < n; i++ {
@@ -117,89 +117,82 @@ func makeLabels(n int) []core.KeyValue {
}
func benchmarkLabels(b *testing.B, n int) {
ctx := context.Background()
fix := newFixture(b)
labs := makeLabels(n)
cnt := fix.meter.NewInt64Counter("int64.counter")
b.ResetTimer()
for i := 0; i < b.N; i++ {
fix.sdk.Labels(labs...)
cnt.Add(ctx, 1, labs...)
}
}
func BenchmarkLabels_1(b *testing.B) {
func BenchmarkInt64CounterAddWithLabels_1(b *testing.B) {
benchmarkLabels(b, 1)
}
func BenchmarkLabels_2(b *testing.B) {
func BenchmarkInt64CounterAddWithLabels_2(b *testing.B) {
benchmarkLabels(b, 2)
}
func BenchmarkLabels_4(b *testing.B) {
func BenchmarkInt64CounterAddWithLabels_4(b *testing.B) {
benchmarkLabels(b, 4)
}
func BenchmarkLabels_8(b *testing.B) {
func BenchmarkInt64CounterAddWithLabels_8(b *testing.B) {
benchmarkLabels(b, 8)
}
func BenchmarkLabels_16(b *testing.B) {
func BenchmarkInt64CounterAddWithLabels_16(b *testing.B) {
benchmarkLabels(b, 16)
}
// Note: performance does not depend on label set size for the
// benchmarks below.
// benchmarks below--all are benchmarked for a single label.
func BenchmarkAcquireNewHandle(b *testing.B) {
fix := newFixture(b)
labelSets := makeLabelSets(b.N)
labelSets := makeManyLabels(b.N)
cnt := fix.meter.NewInt64Counter("int64.counter")
labels := make([]metric.LabelSet, b.N)
for i := 0; i < b.N; i++ {
labels[i] = fix.sdk.Labels(labelSets[i]...)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cnt.Bind(labels[i])
cnt.Bind(labelSets[i]...)
}
}
func BenchmarkAcquireExistingHandle(b *testing.B) {
fix := newFixture(b)
labelSets := makeLabelSets(b.N)
labelSets := makeManyLabels(b.N)
cnt := fix.meter.NewInt64Counter("int64.counter")
labels := make([]metric.LabelSet, b.N)
for i := 0; i < b.N; i++ {
labels[i] = fix.sdk.Labels(labelSets[i]...)
cnt.Bind(labels[i]).Unbind()
cnt.Bind(labelSets[i]...).Unbind()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cnt.Bind(labels[i])
cnt.Bind(labelSets[i]...)
}
}
func BenchmarkAcquireReleaseExistingHandle(b *testing.B) {
fix := newFixture(b)
labelSets := makeLabelSets(b.N)
labelSets := makeManyLabels(b.N)
cnt := fix.meter.NewInt64Counter("int64.counter")
labels := make([]metric.LabelSet, b.N)
for i := 0; i < b.N; i++ {
labels[i] = fix.sdk.Labels(labelSets[i]...)
cnt.Bind(labels[i]).Unbind()
cnt.Bind(labelSets[i]...).Unbind()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cnt.Bind(labels[i]).Unbind()
cnt.Bind(labelSets[i]...).Unbind()
}
}
@@ -224,12 +217,10 @@ func benchmarkIterator(b *testing.B, n int) {
benchmarkIteratorVar = kv
return nil
})
labs := fix.sdk.Labels(makeLabels(n)...)
cnt := fix.meter.NewInt64Counter("int64.counter")
ctx := context.Background()
cnt.Add(ctx, 1, labs)
cnt.Add(ctx, 1, makeLabels(n)...)
b.StopTimer()
b.ResetTimer()
fix.sdk.Collect(ctx)
}
@@ -263,22 +254,22 @@ func BenchmarkIterator_16(b *testing.B) {
func BenchmarkInt64CounterAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
cnt := fix.meter.NewInt64Counter("int64.counter")
b.ResetTimer()
for i := 0; i < b.N; i++ {
cnt.Add(ctx, 1, labs)
cnt.Add(ctx, 1, labs...)
}
}
func BenchmarkInt64CounterHandleAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
cnt := fix.meter.NewInt64Counter("int64.counter")
handle := cnt.Bind(labs)
handle := cnt.Bind(labs...)
b.ResetTimer()
@@ -290,22 +281,22 @@ func BenchmarkInt64CounterHandleAdd(b *testing.B) {
func BenchmarkFloat64CounterAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
cnt := fix.meter.NewFloat64Counter("float64.counter")
b.ResetTimer()
for i := 0; i < b.N; i++ {
cnt.Add(ctx, 1.1, labs)
cnt.Add(ctx, 1.1, labs...)
}
}
func BenchmarkFloat64CounterHandleAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
cnt := fix.meter.NewFloat64Counter("float64.counter")
handle := cnt.Bind(labs)
handle := cnt.Bind(labs...)
b.ResetTimer()
@@ -319,22 +310,22 @@ func BenchmarkFloat64CounterHandleAdd(b *testing.B) {
func BenchmarkInt64LastValueAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
mea := fix.meter.NewInt64Measure("int64.lastvalue")
b.ResetTimer()
for i := 0; i < b.N; i++ {
mea.Record(ctx, int64(i), labs)
mea.Record(ctx, int64(i), labs...)
}
}
func BenchmarkInt64LastValueHandleAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
mea := fix.meter.NewInt64Measure("int64.lastvalue")
handle := mea.Bind(labs)
handle := mea.Bind(labs...)
b.ResetTimer()
@@ -346,22 +337,22 @@ func BenchmarkInt64LastValueHandleAdd(b *testing.B) {
func BenchmarkFloat64LastValueAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
mea := fix.meter.NewFloat64Measure("float64.lastvalue")
b.ResetTimer()
for i := 0; i < b.N; i++ {
mea.Record(ctx, float64(i), labs)
mea.Record(ctx, float64(i), labs...)
}
}
func BenchmarkFloat64LastValueHandleAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
mea := fix.meter.NewFloat64Measure("float64.lastvalue")
handle := mea.Bind(labs)
handle := mea.Bind(labs...)
b.ResetTimer()
@@ -375,22 +366,22 @@ func BenchmarkFloat64LastValueHandleAdd(b *testing.B) {
func benchmarkInt64MeasureAdd(b *testing.B, name string) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
mea := fix.meter.NewInt64Measure(name)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mea.Record(ctx, int64(i), labs)
mea.Record(ctx, int64(i), labs...)
}
}
func benchmarkInt64MeasureHandleAdd(b *testing.B, name string) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
mea := fix.meter.NewInt64Measure(name)
handle := mea.Bind(labs)
handle := mea.Bind(labs...)
b.ResetTimer()
@@ -402,22 +393,22 @@ func benchmarkInt64MeasureHandleAdd(b *testing.B, name string) {
func benchmarkFloat64MeasureAdd(b *testing.B, name string) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
mea := fix.meter.NewFloat64Measure(name)
b.ResetTimer()
for i := 0; i < b.N; i++ {
mea.Record(ctx, float64(i), labs)
mea.Record(ctx, float64(i), labs...)
}
}
func benchmarkFloat64MeasureHandleAdd(b *testing.B, name string) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
mea := fix.meter.NewFloat64Measure(name)
handle := mea.Bind(labs)
handle := mea.Bind(labs...)
b.ResetTimer()
@@ -446,32 +437,30 @@ func BenchmarkObserverRegistration(b *testing.B) {
func BenchmarkObserverObservationInt64(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
_ = fix.meter.RegisterInt64Observer("test.observer", func(result metric.Int64ObserverResult) {
b.StartTimer()
defer b.StopTimer()
for i := 0; i < b.N; i++ {
result.Observe((int64)(i), labs)
result.Observe((int64)(i), labs...)
}
})
b.StopTimer()
b.ResetTimer()
fix.sdk.Collect(ctx)
}
func BenchmarkObserverObservationFloat64(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labs := makeLabels(1)
_ = fix.meter.RegisterFloat64Observer("test.observer", func(result metric.Float64ObserverResult) {
b.StartTimer()
defer b.StopTimer()
for i := 0; i < b.N; i++ {
result.Observe((float64)(i), labs)
result.Observe((float64)(i), labs...)
}
})
b.StopTimer()
b.ResetTimer()
fix.sdk.Collect(ctx)
}
@@ -546,7 +535,7 @@ func benchmarkBatchRecord8Labels(b *testing.B, numInst int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
fix.sdk.RecordBatch(ctx, fix.sdk.Labels(labs...), meas...)
fix.sdk.RecordBatch(ctx, labs, meas...)
}
}

View File

@@ -193,7 +193,7 @@ func TestPushTicker(t *testing.T) {
p.Start()
counter.Add(ctx, 3, meter.Labels())
counter.Add(ctx, 3)
records, exports := fix.exporter.resetRecords()
checkpoints, finishes := fix.batcher.getCounts()
@@ -219,7 +219,7 @@ func TestPushTicker(t *testing.T) {
fix.checkpointSet.Reset()
counter.Add(ctx, 7, meter.Labels())
counter.Add(ctx, 7)
mock.Add(time.Second)
runtime.Gosched()
@@ -286,8 +286,8 @@ func TestPushExportError(t *testing.T) {
p.Start()
runtime.Gosched()
counter1.Add(ctx, 3, meter.Labels())
counter2.Add(ctx, 5, meter.Labels())
counter1.Add(ctx, 3)
counter2.Add(ctx, 5)
require.Equal(t, 0, fix.exporter.exports)
require.Nil(t, err)

View File

@@ -82,7 +82,7 @@ func TestInputRangeTestCounter(t *testing.T) {
counter := Must(meter).NewInt64Counter("name.counter")
counter.Add(ctx, -1, sdk.Labels())
counter.Add(ctx, -1)
require.Equal(t, aggregator.ErrNegativeInput, sdkErr)
sdkErr = nil
@@ -93,7 +93,7 @@ func TestInputRangeTestCounter(t *testing.T) {
require.Nil(t, err)
batcher.records = nil
counter.Add(ctx, 1, sdk.Labels())
counter.Add(ctx, 1)
checkpointed = sdk.Collect(ctx)
sum, err = batcher.records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(1), sum.AsInt64())
@@ -117,7 +117,7 @@ func TestInputRangeTestMeasure(t *testing.T) {
measure := Must(meter).NewFloat64Measure("name.measure")
measure.Record(ctx, math.NaN(), sdk.Labels())
measure.Record(ctx, math.NaN())
require.Equal(t, aggregator.ErrNaNInput, sdkErr)
sdkErr = nil
@@ -127,8 +127,8 @@ func TestInputRangeTestMeasure(t *testing.T) {
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
measure.Record(ctx, 1, sdk.Labels())
measure.Record(ctx, 2, sdk.Labels())
measure.Record(ctx, 1)
measure.Record(ctx, 2)
batcher.records = nil
checkpointed = sdk.Collect(ctx)
@@ -150,7 +150,7 @@ func TestDisabledInstrument(t *testing.T) {
measure := Must(meter).NewFloat64Measure("name.disabled")
measure.Record(ctx, -1, sdk.Labels())
measure.Record(ctx, -1)
checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed)
@@ -173,7 +173,7 @@ func TestRecordNaN(t *testing.T) {
c := Must(meter).NewFloat64Counter("sum.name")
require.Nil(t, sdkErr)
c.Add(ctx, math.NaN(), sdk.Labels())
c.Add(ctx, math.NaN())
require.Error(t, sdkErr)
}
@@ -219,14 +219,14 @@ func TestSDKLabelsDeduplication(t *testing.T) {
expectB = append(expectB, keysB[i].Int(repeats-1))
}
counter.Add(ctx, 1, sdk.Labels(kvsA...))
counter.Add(ctx, 1, sdk.Labels(kvsA...))
counter.Add(ctx, 1, kvsA...)
counter.Add(ctx, 1, kvsA...)
allExpect = append(allExpect, expectA)
if numKeys != 0 {
// In this case A and B sets are the same.
counter.Add(ctx, 1, sdk.Labels(kvsB...))
counter.Add(ctx, 1, sdk.Labels(kvsB...))
counter.Add(ctx, 1, kvsB...)
counter.Add(ctx, 1, kvsB...)
allExpect = append(allExpect, expectB)
}
@@ -290,12 +290,12 @@ func TestObserverCollection(t *testing.T) {
// following line we get 1-1==0 instead of -1:
// result.Observe(1, meter.Labels(key.String("A", "B")))
result.Observe(-1, meter.Labels(key.String("A", "B")))
result.Observe(-1, meter.Labels(key.String("C", "D")))
result.Observe(-1, key.String("A", "B"))
result.Observe(-1, key.String("C", "D"))
})
_ = Must(meter).RegisterInt64Observer("int.observer", func(result metric.Int64ObserverResult) {
result.Observe(1, meter.Labels(key.String("A", "B")))
result.Observe(1, meter.Labels())
result.Observe(1, key.String("A", "B"))
result.Observe(1)
})
_ = Must(meter).RegisterInt64Observer("empty.observer", func(result metric.Int64ObserverResult) {
})
@@ -315,5 +315,44 @@ func TestObserverCollection(t *testing.T) {
"int.observer/": 1,
"int.observer/A=B": 1,
}, out.Map)
}
func TestRecordBatch(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
t: t,
}
sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk, "test")
counter1 := Must(meter).NewInt64Counter("int64.counter")
counter2 := Must(meter).NewFloat64Counter("float64.counter")
measure1 := Must(meter).NewInt64Measure("int64.measure")
measure2 := Must(meter).NewFloat64Measure("float64.measure")
sdk.RecordBatch(
ctx,
[]core.KeyValue{
key.String("A", "B"),
key.String("C", "D"),
},
counter1.Measurement(1),
counter2.Measurement(2),
measure1.Measurement(3),
measure2.Measurement(4),
)
sdk.Collect(ctx)
out := batchTest.NewOutput(export.NewDefaultLabelEncoder())
for _, rec := range batcher.records {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"int64.counter/A=B,C=D": 1,
"float64.counter/A=B,C=D": 2,
"int64.measure/A=B,C=D": 3,
"float64.measure/A=B,C=D": 4,
}, out.Map)
}

View File

@@ -40,9 +40,8 @@ func ExampleNew() {
meter := pusher.Meter("example")
counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key))
labels := meter.Labels(key.String("value"))
counter.Add(ctx, 100, labels)
counter.Add(ctx, 100, key.String("value"))
// Output:
// {

View File

@@ -49,10 +49,6 @@ type (
// `*asyncInstrument` instances
asyncInstruments sync.Map
// empty is the (singleton) result of Labels()
// w/ zero arguments.
empty labels
// currentEpoch is the current epoch number. It is
// incremented in `Collect()`.
currentEpoch int64
@@ -68,6 +64,11 @@ type (
// resource represents the entity producing telemetry.
resource resource.Resource
// asyncSortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation. It is cleared after use.
asyncSortSlice sortedLabels
}
syncInstrument struct {
@@ -78,9 +79,8 @@ type (
// suitable for use as a map key.
orderedLabels interface{}
// labels implements the OpenTelemetry LabelSet API,
// represents an internalized set of labels that may be used
// repeatedly.
// labels represents an internalized set of labels that have been
// sorted and deduplicated.
labels struct {
// cachedEncoderID needs to be aligned for atomic access
cachedEncoderID int64
@@ -88,23 +88,18 @@ type (
// labels
cachedEncoded string
meter *SDK
// ordered is the output of sorting and deduplicating
// the labels, copied into an array of the correct
// size for use as a map key.
ordered orderedLabels
// sortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation
sortSlice sortedLabels
// cachedValue contains a `reflect.Value` of the `ordered`
// member
cachedValue reflect.Value
}
// mapkey uniquely describes a metric instrument in terms of
// its InstrumentID and the encoded form of its LabelSet.
// its InstrumentID and the encoded form of its labels.
mapkey struct {
descriptor *metric.Descriptor
ordered orderedLabels
@@ -125,8 +120,15 @@ type (
// modified has to be aligned for 64-bit atomic operations.
modified int64
// labels is the LabelSet passed by the user.
labels *labels
// labels is the processed label set for this record.
//
// labels has to be aligned for 64-bit atomic operations.
labels labels
// sortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation.
sortSlice sortedLabels
// inst is a pointer to the corresponding instrument.
inst *syncInstrument
@@ -148,13 +150,13 @@ type (
// labelset and recorder
recorders map[orderedLabels]labeledRecorder
callback func(func(core.Number, api.LabelSet))
callback func(func(core.Number, []core.KeyValue))
}
labeledRecorder struct {
recorder export.Aggregator
labels *labels
modifiedEpoch int64
labels labels
recorder export.Aggregator
}
ErrorHandler func(error)
@@ -162,7 +164,6 @@ type (
var (
_ api.MeterImpl = &SDK{}
_ api.LabelSet = &labels{}
_ api.AsyncImpl = &asyncInstrument{}
_ api.SyncImpl = &syncInstrument{}
_ api.BoundSyncImpl = &record{}
@@ -171,6 +172,11 @@ var (
_ export.Labels = &labels{}
kvType = reflect.TypeOf(core.KeyValue{})
emptyLabels = labels{
ordered: [0]core.KeyValue{},
cachedValue: reflect.ValueOf([0]core.KeyValue{}),
}
)
func (inst *instrument) Descriptor() api.Descriptor {
@@ -185,12 +191,12 @@ func (s *syncInstrument) Implementation() interface{} {
return s
}
func (a *asyncInstrument) observe(number core.Number, ls api.LabelSet) {
func (a *asyncInstrument) observe(number core.Number, labels []core.KeyValue) {
if err := aggregator.RangeTest(number, &a.descriptor); err != nil {
a.meter.errorHandler(err)
return
}
recorder := a.getRecorder(ls)
recorder := a.getRecorder(labels)
if recorder == nil {
// The instrument is disabled according to the
// AggregationSelector.
@@ -202,8 +208,12 @@ func (a *asyncInstrument) observe(number core.Number, ls api.LabelSet) {
}
}
func (a *asyncInstrument) getRecorder(ls api.LabelSet) export.Aggregator {
labels := a.meter.labsFor(ls)
func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {
// We are in a single-threaded context. Note: this assumption
// could be violated if the user added concurrency within
// their callback.
labels := a.meter.makeLabels(kvs, &a.meter.asyncSortSlice)
lrec, ok := a.recorders[labels.ordered]
if ok {
lrec.modifiedEpoch = a.meter.currentEpoch
@@ -229,32 +239,50 @@ func (m *SDK) SetErrorHandler(f ErrorHandler) {
m.errorHandler = f
}
func (s *syncInstrument) acquireHandle(ls *labels) *record {
// Create lookup key for sync.Map (one allocation)
// acquireHandle gets or creates a `*record` corresponding to `kvs`,
// the input labels. The second argument `labels` is passed in to
// support re-use of the orderedLabels computed by a previous
// measurement in the same batch. This performs two allocations
// in the common case.
func (s *syncInstrument) acquireHandle(kvs []core.KeyValue, lptr *labels) *record {
var rec *record
var labels labels
if lptr == nil || lptr.ordered == nil {
// This memory allocation may not be used, but it's
// needed for the `sortSlice` field, to avoid an
// allocation while sorting.
rec = &record{}
labels = s.meter.makeLabels(kvs, &rec.sortSlice)
} else {
labels = *lptr
}
// Create lookup key for sync.Map (one allocation, as this
// passes through an interface{})
mk := mapkey{
descriptor: &s.descriptor,
ordered: ls.ordered,
ordered: labels.ordered,
}
if actual, ok := s.meter.current.Load(mk); ok {
// Existing record case, only one allocation so far.
rec := actual.(*record)
if rec.refMapped.ref() {
// Existing record case.
existingRec := actual.(*record)
if existingRec.refMapped.ref() {
// At this moment it is guaranteed that the entry is in
// the map and will not be removed.
return rec
return existingRec
}
// This entry is no longer mapped, try to add a new entry.
}
// There's a memory allocation here.
rec := &record{
labels: ls,
inst: s,
refMapped: refcountMapped{value: 2},
modified: 0,
recorder: s.meter.batcher.AggregatorFor(&s.descriptor),
if rec == nil {
rec = &record{}
}
rec.refMapped = refcountMapped{value: 2}
rec.labels = labels
rec.inst = s
rec.recorder = s.meter.batcher.AggregatorFor(&s.descriptor)
for {
// Load/Store: there's a memory allocation to place `mk` into
@@ -286,14 +314,12 @@ func (s *syncInstrument) acquireHandle(ls *labels) *record {
}
}
func (s *syncInstrument) Bind(ls api.LabelSet) api.BoundSyncImpl {
labs := s.meter.labsFor(ls)
return s.acquireHandle(labs)
func (s *syncInstrument) Bind(kvs []core.KeyValue) api.BoundSyncImpl {
return s.acquireHandle(kvs, nil)
}
func (s *syncInstrument) RecordOne(ctx context.Context, number core.Number, ls api.LabelSet) {
ourLs := s.meter.labsFor(ls)
h := s.acquireHandle(ourLs)
func (s *syncInstrument) RecordOne(ctx context.Context, number core.Number, kvs []core.KeyValue) {
h := s.acquireHandle(kvs, nil)
defer h.Unbind()
h.RecordOne(ctx, number)
}
@@ -313,43 +339,36 @@ func New(batcher export.Batcher, opts ...Option) *SDK {
opt.Apply(c)
}
m := &SDK{
empty: labels{
ordered: [0]core.KeyValue{},
},
return &SDK{
batcher: batcher,
errorHandler: c.ErrorHandler,
resource: c.Resource,
}
m.empty.meter = m
m.empty.cachedValue = reflect.ValueOf(m.empty.ordered)
return m
}
func DefaultErrorHandler(err error) {
fmt.Fprintln(os.Stderr, "Metrics SDK error:", err)
}
// Labels returns a LabelSet corresponding to the arguments. Passed labels
// makeLabels returns a `labels` corresponding to the arguments. Labels
// are sorted and de-duplicated, with last-value-wins semantics. Note that
// sorting and deduplicating happens in-place to avoid allocation, so the
// passed slice will be modified.
func (m *SDK) Labels(kvs ...core.KeyValue) api.LabelSet {
// passed slice will be modified. The `sortSlice` argument refers to a memory
// location used temporarily while sorting the slice, to avoid a memory
// allocation.
func (m *SDK) makeLabels(kvs []core.KeyValue, sortSlice *sortedLabels) labels {
// Check for empty set.
if len(kvs) == 0 {
return &m.empty
return emptyLabels
}
ls := &labels{ // allocation
meter: m,
sortSlice: kvs,
}
*sortSlice = kvs
// Sort and de-duplicate. Note: this use of `ls.sortSlice`
// avoids an allocation by using the address-able field rather
// than `kvs`.
sort.Stable(&ls.sortSlice)
ls.sortSlice = nil
// Sort and de-duplicate. Note: this use of `sortSlice`
// avoids an allocation because it is a pointer.
sort.Stable(sortSlice)
*sortSlice = nil
oi := 1
for i := 1; i < len(kvs); i++ {
@@ -362,8 +381,7 @@ func (m *SDK) Labels(kvs ...core.KeyValue) api.LabelSet {
oi++
}
kvs = kvs[0:oi]
ls.computeOrdered(kvs)
return ls
return computeOrderedLabels(kvs)
}
// NumLabels is a part of an implementation of the export.LabelStorage
@@ -429,12 +447,14 @@ func (ls *labels) Encoded(encoder export.LabelEncoder) string {
return encoded
}
func (ls *labels) computeOrdered(kvs []core.KeyValue) {
func computeOrderedLabels(kvs []core.KeyValue) labels {
var ls labels
ls.ordered = computeOrderedFixed(kvs)
if ls.ordered == nil {
ls.ordered = computeOrderedReflect(kvs)
}
ls.cachedValue = reflect.ValueOf(ls.ordered)
return ls
}
func computeOrderedFixed(kvs []core.KeyValue) orderedLabels {
@@ -492,18 +512,6 @@ func computeOrderedReflect(kvs []core.KeyValue) interface{} {
return at.Interface()
}
// labsFor sanitizes the input LabelSet. The input will be rejected
// if it was created by another Meter instance, for example.
func (m *SDK) labsFor(ls api.LabelSet) *labels {
if del, ok := ls.(api.LabelSetDelegate); ok {
ls = del.Delegate()
}
if l, _ := ls.(*labels); l != nil && l.meter == m {
return l
}
return &m.empty
}
func (m *SDK) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) {
return &syncInstrument{
instrument: instrument{
@@ -513,7 +521,7 @@ func (m *SDK) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error)
}, nil
}
func (m *SDK) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(core.Number, api.LabelSet))) (api.AsyncImpl, error) {
func (m *SDK) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(core.Number, []core.KeyValue))) (api.AsyncImpl, error) {
a := &asyncInstrument{
instrument: instrument{
descriptor: descriptor,
@@ -551,6 +559,9 @@ func (m *SDK) collectRecords(ctx context.Context) int {
unmapped := inuse.refMapped.tryUnmap()
// If able to unmap then remove the record from the current Map.
if unmapped {
// TODO: Consider leaving the record in the map for one
// collection interval? Since creating records is relatively
// expensive, this would optimize common cases of ongoing use.
m.current.Delete(inuse.mapkey())
}
@@ -583,7 +594,7 @@ func (m *SDK) collectAsync(ctx context.Context) int {
}
func (m *SDK) checkpointRecord(ctx context.Context, r *record) int {
return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels)
return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, &r.labels)
}
func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
@@ -592,9 +603,10 @@ func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
}
checkpointed := 0
for encodedLabels, lrec := range a.recorders {
lrec := lrec
epochDiff := m.currentEpoch - lrec.modifiedEpoch
if epochDiff == 0 {
checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, lrec.labels)
checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, &lrec.labels)
} else if epochDiff > 1 {
// This is second collection cycle with no
// observations for this labelset. Remove the
@@ -633,9 +645,24 @@ func (m *SDK) Resource() resource.Resource {
}
// RecordBatch enters a batch of metric events.
func (m *SDK) RecordBatch(ctx context.Context, ls api.LabelSet, measurements ...api.Measurement) {
for _, meas := range measurements {
meas.SyncImpl().RecordOne(ctx, meas.Number(), ls)
func (m *SDK) RecordBatch(ctx context.Context, kvs []core.KeyValue, measurements ...api.Measurement) {
// Labels will be computed the first time acquireHandle is
// called. Subsequent calls to acquireHandle will re-use the
// previously computed value instead of recomputing the
// ordered labels.
var labels labels
for i, meas := range measurements {
s := meas.SyncImpl().(*syncInstrument)
h := s.acquireHandle(kvs, &labels)
// Re-use labels for the next measurement.
if i == 0 {
labels = h.labels
}
defer h.Unbind()
h.RecordOne(ctx, meas.Number())
}
}
@@ -645,11 +672,11 @@ func (r *record) RecordOne(ctx context.Context, number core.Number) {
return
}
if err := aggregator.RangeTest(number, &r.inst.descriptor); err != nil {
r.labels.meter.errorHandler(err)
r.inst.meter.errorHandler(err)
return
}
if err := r.recorder.Update(ctx, number, &r.inst.descriptor); err != nil {
r.labels.meter.errorHandler(err)
r.inst.meter.errorHandler(err)
return
}
}

View File

@@ -44,7 +44,7 @@ import (
const (
concurrencyPerCPU = 100
reclaimPeriod = time.Millisecond * 100
testRun = time.Second
testRun = 5 * time.Second
epsilon = 1e-10
)
@@ -75,7 +75,7 @@ type (
testImpl struct {
newInstrument func(meter api.Meter, name string) SyncImpler
getUpdateValue func() core.Number
operate func(interface{}, context.Context, core.Number, api.LabelSet)
operate func(interface{}, context.Context, core.Number, []core.KeyValue)
newStore func() interface{}
// storeCollect and storeExpect are the same for
@@ -167,7 +167,6 @@ func (f *testFixture) startWorker(impl *SDK, meter api.Meter, wg *sync.WaitGroup
}
kvs := f.someLabels()
clabs := canonicalizeLabels(kvs)
labs := meter.Labels(kvs...)
dur := getPeriod()
key := testKey{
labels: clabs,
@@ -177,7 +176,7 @@ func (f *testFixture) startWorker(impl *SDK, meter api.Meter, wg *sync.WaitGroup
sleep := time.Duration(rand.ExpFloat64() * float64(dur))
time.Sleep(sleep)
value := f.impl.getUpdateValue()
f.impl.operate(instrument, ctx, value, labs)
f.impl.operate(instrument, ctx, value, kvs)
actual, _ := f.expected.LoadOrStore(key, f.impl.newStore())
@@ -191,6 +190,7 @@ func (f *testFixture) startWorker(impl *SDK, meter api.Meter, wg *sync.WaitGroup
}
func (f *testFixture) assertTest(numCollect int) {
var allErrs []func()
csize := 0
f.received.Range(func(key, gstore interface{}) bool {
csize++
@@ -198,13 +198,18 @@ func (f *testFixture) assertTest(numCollect int) {
estore, loaded := f.expected.Load(key)
if !loaded {
f.T.Error("Could not locate expected key: ", key)
allErrs = append(allErrs, func() {
f.T.Error("Could not locate expected key: ", key)
})
return true
}
evalue := f.impl.readStore(estore)
if !f.impl.equalValues(evalue, gvalue) {
f.T.Error("Expected value mismatch: ",
evalue, "!=", gvalue, " for ", key)
allErrs = append(allErrs, func() {
f.T.Error("Expected value mismatch: ",
evalue, "!=", gvalue, " for ", key)
})
}
return true
})
@@ -212,7 +217,9 @@ func (f *testFixture) assertTest(numCollect int) {
f.expected.Range(func(key, value interface{}) bool {
rsize++
if _, loaded := f.received.Load(key); !loaded {
f.T.Error("Did not receive expected key: ", key)
allErrs = append(allErrs, func() {
f.T.Error("Did not receive expected key: ", key)
})
}
return true
})
@@ -220,6 +227,10 @@ func (f *testFixture) assertTest(numCollect int) {
f.T.Error("Did not receive the correct set of metrics: Received != Expected", rsize, csize)
}
for _, anErr := range allErrs {
anErr()
}
// Note: It's useful to know the test triggers this condition,
// but we can't assert it. Infrequently no duplicates are
// found, and we can't really force a race to happen.
@@ -353,9 +364,9 @@ func intCounterTestImpl() testImpl {
}
}
},
operate: func(inst interface{}, ctx context.Context, value core.Number, labels api.LabelSet) {
operate: func(inst interface{}, ctx context.Context, value core.Number, labels []core.KeyValue) {
counter := inst.(api.Int64Counter)
counter.Add(ctx, value.AsInt64(), labels)
counter.Add(ctx, value.AsInt64(), labels...)
},
newStore: func() interface{} {
n := core.NewInt64Number(0)
@@ -391,9 +402,9 @@ func floatCounterTestImpl() testImpl {
}
}
},
operate: func(inst interface{}, ctx context.Context, value core.Number, labels api.LabelSet) {
operate: func(inst interface{}, ctx context.Context, value core.Number, labels []core.KeyValue) {
counter := inst.(api.Float64Counter)
counter.Add(ctx, value.AsFloat64(), labels)
counter.Add(ctx, value.AsFloat64(), labels...)
},
newStore: func() interface{} {
n := core.NewFloat64Number(0.0)
@@ -427,9 +438,9 @@ func intLastValueTestImpl() testImpl {
r1 := rand.Int63()
return core.NewInt64Number(rand.Int63() - r1)
},
operate: func(inst interface{}, ctx context.Context, value core.Number, labels api.LabelSet) {
operate: func(inst interface{}, ctx context.Context, value core.Number, labels []core.KeyValue) {
measure := inst.(api.Int64Measure)
measure.Record(ctx, value.AsInt64(), labels)
measure.Record(ctx, value.AsInt64(), labels...)
},
newStore: func() interface{} {
return &lastValueState{
@@ -468,9 +479,9 @@ func floatLastValueTestImpl() testImpl {
getUpdateValue: func() core.Number {
return core.NewFloat64Number((-0.5 + rand.Float64()) * 100000)
},
operate: func(inst interface{}, ctx context.Context, value core.Number, labels api.LabelSet) {
operate: func(inst interface{}, ctx context.Context, value core.Number, labels []core.KeyValue) {
measure := inst.(api.Float64Measure)
measure.Record(ctx, value.AsFloat64(), labels)
measure.Record(ctx, value.AsFloat64(), labels...)
},
newStore: func() interface{} {
return &lastValueState{