1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-04-21 11:57:04 +02:00

Checkpoint only after Update; Keep records in the sync.Map longer (#647)

* Add a test

* Benchmark

* Ensure records can persist across intervals

* Remove dead code

* Address feedback, refactor & eliminate race
This commit is contained in:
Joshua MacDonald 2020-04-21 20:23:15 -07:00 committed by GitHub
parent e554562513
commit 395440db10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 49 deletions

View File

@ -19,7 +19,7 @@ import "unsafe"
func AtomicFieldOffsets() map[string]uintptr { func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{ return map[string]uintptr{
"record.refMapped.value": unsafe.Offsetof(record{}.refMapped.value), "record.refMapped.value": unsafe.Offsetof(record{}.refMapped.value),
"record.modified": unsafe.Offsetof(record{}.modified), "record.updateCount": unsafe.Offsetof(record{}.updateCount),
"record.labels.cachedEncoderID": unsafe.Offsetof(record{}.labels.cachedEncoded), "record.labels.cachedEncoderID": unsafe.Offsetof(record{}.labels.cachedEncoded),
} }
} }

View File

@ -555,7 +555,30 @@ func BenchmarkBatchRecord_8Labels_8Instruments(b *testing.B) {
benchmarkBatchRecord8Labels(b, 8) benchmarkBatchRecord8Labels(b, 8)
} }
// Record creation
func BenchmarkRepeatedDirectCalls(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
encoder := export.NewDefaultLabelEncoder()
fix.pcb = func(_ context.Context, rec export.Record) error {
_ = rec.Labels().Encoded(encoder)
return nil
}
c := fix.meter.NewInt64Counter("int64.counter")
k := key.String("bench", "true")
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.Add(ctx, 1, k)
fix.sdk.Collect(ctx)
}
}
// LabelIterator // LabelIterator
func BenchmarkLabelIterator(b *testing.B) { func BenchmarkLabelIterator(b *testing.B) {
const labelCount = 1024 const labelCount = 1024
ctx := context.Background() ctx := context.Background()

View File

@ -19,6 +19,7 @@ import (
"fmt" "fmt"
"math" "math"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -37,21 +38,28 @@ import (
var Must = metric.Must var Must = metric.Must
type correctnessBatcher struct { type correctnessBatcher struct {
newAggCount int64
t *testing.T t *testing.T
records []export.Record records []export.Record
} }
func (cb *correctnessBatcher) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { func (cb *correctnessBatcher) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
name := descriptor.Name() name := descriptor.Name()
switch { switch {
case strings.HasSuffix(name, ".counter"): case strings.HasSuffix(name, ".counter"):
return sum.New() agg = sum.New()
case strings.HasSuffix(name, ".disabled"): case strings.HasSuffix(name, ".disabled"):
return nil agg = nil
default: default:
return array.New() agg = array.New()
} }
if agg != nil {
atomic.AddInt64(&cb.newAggCount, 1)
}
return
} }
func (cb *correctnessBatcher) CheckpointSet() export.CheckpointSet { func (cb *correctnessBatcher) CheckpointSet() export.CheckpointSet {
@ -87,15 +95,12 @@ func TestInputRangeTestCounter(t *testing.T) {
sdkErr = nil sdkErr = nil
checkpointed := sdk.Collect(ctx) checkpointed := sdk.Collect(ctx)
sum, err := batcher.records[0].Aggregator().(aggregator.Sum).Sum() require.Equal(t, 0, checkpointed)
require.Equal(t, int64(0), sum.AsInt64())
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
batcher.records = nil batcher.records = nil
counter.Add(ctx, 1) counter.Add(ctx, 1)
checkpointed = sdk.Collect(ctx) checkpointed = sdk.Collect(ctx)
sum, err = batcher.records[0].Aggregator().(aggregator.Sum).Sum() sum, err := batcher.records[0].Aggregator().(aggregator.Sum).Sum()
require.Equal(t, int64(1), sum.AsInt64()) require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed) require.Equal(t, 1, checkpointed)
require.Nil(t, err) require.Nil(t, err)
@ -122,10 +127,7 @@ func TestInputRangeTestMeasure(t *testing.T) {
sdkErr = nil sdkErr = nil
checkpointed := sdk.Collect(ctx) checkpointed := sdk.Collect(ctx)
count, err := batcher.records[0].Aggregator().(aggregator.Distribution).Count() require.Equal(t, 0, checkpointed)
require.Equal(t, int64(0), count)
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
measure.Record(ctx, 1) measure.Record(ctx, 1)
measure.Record(ctx, 2) measure.Record(ctx, 2)
@ -133,7 +135,7 @@ func TestInputRangeTestMeasure(t *testing.T) {
batcher.records = nil batcher.records = nil
checkpointed = sdk.Collect(ctx) checkpointed = sdk.Collect(ctx)
count, err = batcher.records[0].Aggregator().(aggregator.Distribution).Count() count, err := batcher.records[0].Aggregator().(aggregator.Distribution).Count()
require.Equal(t, int64(2), count) require.Equal(t, int64(2), count)
require.Equal(t, 1, checkpointed) require.Equal(t, 1, checkpointed)
require.Nil(t, sdkErr) require.Nil(t, sdkErr)
@ -356,3 +358,28 @@ func TestRecordBatch(t *testing.T) {
"float64.measure/A=B,C=D": 4, "float64.measure/A=B,C=D": 4,
}, out.Map) }, out.Map)
} }
// TestRecordPersistence ensures that a direct-called instrument that
// is repeatedly used each interval results in a persistent record, so
// that its encoded labels will be cached across collection intervals.
func TestRecordPersistence(t *testing.T) {
ctx := context.Background()
batcher := &correctnessBatcher{
t: t,
}
sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk, "test")
c := Must(meter).NewFloat64Counter("sum.name")
b := c.Bind(key.String("bound", "true"))
uk := key.String("bound", "false")
for i := 0; i < 100; i++ {
c.Add(ctx, 1, uk)
b.Add(ctx, 1)
sdk.Collect(ctx)
}
require.Equal(t, int64(2), batcher.newAggCount)
}

View File

@ -42,12 +42,6 @@ func (rm *refcountMapped) unref() {
atomic.AddInt64(&rm.value, -2) atomic.AddInt64(&rm.value, -2)
} }
// inUse returns true if there is a reference to the entry and it is mapped.
func (rm *refcountMapped) inUse() bool {
val := atomic.LoadInt64(&rm.value)
return val >= 2 && val&1 == 0
}
// tryUnmap flips the mapped bit to "unmapped" state and returns true if both of the // tryUnmap flips the mapped bit to "unmapped" state and returns true if both of the
// following conditions are true upon entry to this function: // following conditions are true upon entry to this function:
// * There are no active references; // * There are no active references;

View File

@ -110,11 +110,12 @@ type (
// SDK.current map. // SDK.current map.
refMapped refcountMapped refMapped refcountMapped
// modified is an atomic boolean that tracks if the current record // updateCount is incremented on every Update.
// was modified since the last Collect(). updateCount int64
//
// modified has to be aligned for 64-bit atomic operations. // collectedCount is set to updateCount on collection,
modified int64 // supports checking for no updates during a round.
collectedCount int64
// labels is the processed label set for this record. // labels is the processed label set for this record.
// //
@ -150,7 +151,7 @@ type (
} }
labeledRecorder struct { labeledRecorder struct {
modifiedEpoch int64 observedEpoch int64
labels labels labels labels
recorder export.Aggregator recorder export.Aggregator
} }
@ -211,12 +212,12 @@ func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {
lrec, ok := a.recorders[labels.ordered] lrec, ok := a.recorders[labels.ordered]
if ok { if ok {
if lrec.modifiedEpoch == a.meter.currentEpoch { if lrec.observedEpoch == a.meter.currentEpoch {
// last value wins for Observers, so if we see the same labels // last value wins for Observers, so if we see the same labels
// in the current epoch, we replace the old recorder // in the current epoch, we replace the old recorder
lrec.recorder = a.meter.batcher.AggregatorFor(&a.descriptor) lrec.recorder = a.meter.batcher.AggregatorFor(&a.descriptor)
} else { } else {
lrec.modifiedEpoch = a.meter.currentEpoch lrec.observedEpoch = a.meter.currentEpoch
} }
a.recorders[labels.ordered] = lrec a.recorders[labels.ordered] = lrec
return lrec.recorder return lrec.recorder
@ -231,7 +232,7 @@ func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator {
a.recorders[labels.ordered] = labeledRecorder{ a.recorders[labels.ordered] = labeledRecorder{
recorder: rec, recorder: rec,
labels: labels, labels: labels,
modifiedEpoch: a.meter.currentEpoch, observedEpoch: a.meter.currentEpoch,
} }
return rec return rec
} }
@ -557,25 +558,39 @@ func (m *SDK) collectRecords(ctx context.Context) int {
checkpointed := 0 checkpointed := 0
m.current.Range(func(key interface{}, value interface{}) bool { m.current.Range(func(key interface{}, value interface{}) bool {
// Note: always continue to iterate over the entire
// map by returning `true` in this function.
inuse := value.(*record) inuse := value.(*record)
unmapped := inuse.refMapped.tryUnmap()
// If able to unmap then remove the record from the current Map. mods := atomic.LoadInt64(&inuse.updateCount)
if unmapped { coll := inuse.collectedCount
// TODO: Consider leaving the record in the map for one
// collection interval? Since creating records is relatively if mods != coll {
// expensive, this would optimize common cases of ongoing use. // Updates happened in this interval,
m.current.Delete(inuse.mapkey()) // checkpoint and continue.
checkpointed += m.checkpointRecord(ctx, inuse)
inuse.collectedCount = mods
return true
} }
// Always report the values if a reference to the Record is active, // Having no updates since last collection, try to unmap:
// this is to keep the previous behavior. if unmapped := inuse.refMapped.tryUnmap(); !unmapped {
// TODO: Reconsider this logic. // The record is referenced by a binding, continue.
if inuse.refMapped.inUse() || atomic.LoadInt64(&inuse.modified) != 0 { return true
atomic.StoreInt64(&inuse.modified, 0) }
// If any other goroutines are now trying to re-insert this
// entry in the map, they are busy calling Gosched() awaiting
// this deletion:
m.current.Delete(inuse.mapkey())
// There's a potential race between `LoadInt64` and
// `tryUnmap` in this function. Since this is the
// last we'll see of this record, checkpoint
mods = atomic.LoadInt64(&inuse.updateCount)
if mods != coll {
checkpointed += m.checkpointRecord(ctx, inuse) checkpointed += m.checkpointRecord(ctx, inuse)
} }
// Always continue to iterate over the entire map.
return true return true
}) })
@ -606,7 +621,7 @@ func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
checkpointed := 0 checkpointed := 0
for encodedLabels, lrec := range a.recorders { for encodedLabels, lrec := range a.recorders {
lrec := lrec lrec := lrec
epochDiff := m.currentEpoch - lrec.modifiedEpoch epochDiff := m.currentEpoch - lrec.observedEpoch
if epochDiff == 0 { if epochDiff == 0 {
checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, &lrec.labels) checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, &lrec.labels)
} else if epochDiff > 1 { } else if epochDiff > 1 {
@ -681,13 +696,12 @@ func (r *record) RecordOne(ctx context.Context, number core.Number) {
r.inst.meter.errorHandler(err) r.inst.meter.errorHandler(err)
return return
} }
// Record was modified, inform the Collect() that things need
// to be collected while the record is still mapped.
atomic.AddInt64(&r.updateCount, 1)
} }
func (r *record) Unbind() { func (r *record) Unbind() {
// Record was modified, inform the Collect() that things need to be collected.
// TODO: Reconsider if we should marked as modified when an Update happens and
// collect only when updates happened even for Bounds.
atomic.StoreInt64(&r.modified, 1)
r.refMapped.unref() r.refMapped.unref()
} }