1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-04-15 11:36:44 +02:00

Refactor metric records logic (#468)

* Refactor metric records logic.

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>

* Fix lint errors

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>

* Fix a bug that we try to readd the old entry instead of a new one.

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>

* Update comments in refcount_mapped.

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>

* Remove the need to use a records list, iterate over the map.

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>

* Fix comments and typos

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>

* Fix more comments

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>

* Clarify tryUnmap comment

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>

* Fix one more typo.

Signed-off-by: Bogdan Cristian Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2020-02-06 14:45:56 -08:00 committed by GitHub
parent 4818358f94
commit 69df67d449
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 130 additions and 207 deletions

View File

@ -12,20 +12,12 @@ import (
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{ fields := []ottest.FieldOffset{
{ {
Name: "record.refcount", Name: "record.refMapped.value",
Offset: unsafe.Offsetof(record{}.refcount), Offset: unsafe.Offsetof(record{}.refMapped.value),
}, },
{ {
Name: "record.collectedEpoch", Name: "record.modified",
Offset: unsafe.Offsetof(record{}.collectedEpoch), Offset: unsafe.Offsetof(record{}.modified),
},
{
Name: "record.modifiedEpoch",
Offset: unsafe.Offsetof(record{}.modifiedEpoch),
},
{
Name: "record.reclaim",
Offset: unsafe.Offsetof(record{}.reclaim),
}, },
} }
if !ottest.Aligned8Byte(fields, os.Stderr) { if !ottest.Aligned8Byte(fields, os.Stderr) {

View File

@ -14,11 +14,6 @@
package metric package metric
import (
"sync/atomic"
"unsafe"
)
func (l *sortedLabels) Len() int { func (l *sortedLabels) Len() int {
return len(*l) return len(*l)
} }
@ -30,51 +25,3 @@ func (l *sortedLabels) Swap(i, j int) {
func (l *sortedLabels) Less(i, j int) bool { func (l *sortedLabels) Less(i, j int) bool {
return (*l)[i].Key < (*l)[j].Key return (*l)[i].Key < (*l)[j].Key
} }
func (m *SDK) addPrimary(rec *record) {
for {
rec.next.primary.store(m.records.primary.load())
if atomic.CompareAndSwapPointer(
&m.records.primary.ptr,
rec.next.primary.ptr,
unsafe.Pointer(rec),
) {
return
}
}
}
func (m *SDK) addReclaim(rec *record) {
for {
rec.next.reclaim.store(m.records.reclaim.load())
if atomic.CompareAndSwapPointer(
&m.records.reclaim.ptr,
rec.next.reclaim.ptr,
unsafe.Pointer(rec),
) {
return
}
}
}
func (s *singlePtr) swapNil() *record {
for {
newValue := unsafe.Pointer(nil)
swapped := atomic.LoadPointer(&s.ptr)
if atomic.CompareAndSwapPointer(&s.ptr, swapped, newValue) {
return (*record)(swapped)
}
}
}
func (s *singlePtr) load() *record {
return (*record)(atomic.LoadPointer(&s.ptr))
}
func (s *singlePtr) store(r *record) {
atomic.StorePointer(&s.ptr, unsafe.Pointer(r))
}
func (s *singlePtr) clear() {
atomic.StorePointer(&s.ptr, unsafe.Pointer(nil))
}

View File

@ -0,0 +1,65 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"sync/atomic"
)
// refcountMapped atomically counts the number of references (usages) of an entry
// while also keeping a state of mapped/unmapped into a different data structure
// (an external map or list for example).
//
// refcountMapped uses an atomic value where the least significant bit is used to
// keep the state of mapping ('1' is used for unmapped and '0' is for mapped) and
// the rest of the bits are used for refcounting.
type refcountMapped struct {
// refcount has to be aligned for 64-bit atomic operations.
value int64
}
// ref returns true if the entry is still mapped and increases the
// reference usages, if unmapped returns false.
func (rm *refcountMapped) ref() bool {
// Check if this entry was marked as unmapped between the moment
// we got a reference to it (or will be removed very soon) and here.
return atomic.AddInt64(&rm.value, 2)&1 == 0
}
func (rm *refcountMapped) unref() {
atomic.AddInt64(&rm.value, -2)
}
// inUse returns true if there is a reference to the entry and it is mapped.
func (rm *refcountMapped) inUse() bool {
val := atomic.LoadInt64(&rm.value)
return val >= 2 && val&1 == 0
}
// tryUnmap flips the mapped bit to "unmapped" state and returns true if both of the
// following conditions are true upon entry to this function:
// * There are no active references;
// * The mapped bit is in "mapped" state.
// Otherwise no changes are done to mapped bit and false is returned.
func (rm *refcountMapped) tryUnmap() bool {
if atomic.LoadInt64(&rm.value) != 0 {
return false
}
return atomic.CompareAndSwapInt64(
&rm.value,
0,
1,
)
}

View File

@ -21,7 +21,6 @@ import (
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"unsafe"
"go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/api/metric"
@ -47,10 +46,6 @@ type (
// w/ zero arguments. // w/ zero arguments.
empty labels empty labels
// records is the head of both the primary and the
// reclaim records lists.
records doublePtr
// currentEpoch is the current epoch number. It is // currentEpoch is the current epoch number. It is
// incremented in `Collect()`. // incremented in `Collect()`.
currentEpoch int64 currentEpoch int64
@ -97,32 +92,15 @@ type (
// `record` in existence at a time, although at most one can // `record` in existence at a time, although at most one can
// be referenced from the `SDK.current` map. // be referenced from the `SDK.current` map.
record struct { record struct {
// refcount counts the number of active handles on // refMapped keeps track of refcounts and the mapping state to the
// referring to this record. active handles prevent // SDK.current map.
// removing the record from the current map. refMapped refcountMapped
//
// refcount has to be aligned for 64-bit atomic operations.
refcount int64
// collectedEpoch is the epoch number for which this // modified is an atomic boolean that tracks if the current record
// record has been exported. This is modified by the // was modified since the last Collect().
// `Collect()` method.
// //
// collectedEpoch has to be aligned for 64-bit atomic operations. // modified has to be aligned for 64-bit atomic operations.
collectedEpoch int64 modified int64
// modifiedEpoch is the latest epoch number for which
// this record was updated. Generally, if
// modifiedEpoch is less than collectedEpoch, this
// record is due for reclaimation.
//
// modifiedEpoch has to be aligned for 64-bit atomic operations.
modifiedEpoch int64
// reclaim is an atomic to control the start of reclaiming.
//
// reclaim has to be aligned for 64-bit atomic operations.
reclaim int64
// labels is the LabelSet passed by the user. // labels is the LabelSet passed by the user.
labels *labels labels *labels
@ -134,25 +112,9 @@ type (
// depending on the type of aggregation. If nil, the // depending on the type of aggregation. If nil, the
// metric was disabled by the exporter. // metric was disabled by the exporter.
recorder export.Aggregator recorder export.Aggregator
// next contains the next pointer for both the primary
// and the reclaim lists.
next doublePtr
} }
ErrorHandler func(error) ErrorHandler func(error)
// singlePointer wraps an unsafe.Pointer and supports basic
// load(), store(), clear(), and swapNil() operations.
singlePtr struct {
ptr unsafe.Pointer
}
// doublePtr is used for the head and next links of two lists.
doublePtr struct {
primary singlePtr
reclaim singlePtr
}
) )
var ( var (
@ -160,12 +122,6 @@ var (
_ api.LabelSet = &labels{} _ api.LabelSet = &labels{}
_ api.InstrumentImpl = &instrument{} _ api.InstrumentImpl = &instrument{}
_ api.BoundInstrumentImpl = &record{} _ api.BoundInstrumentImpl = &record{}
// hazardRecord is used as a pointer value that indicates the
// value is not included in any list. (`nil` would be
// ambiguous, since the final element in a list has `nil` as
// the next pointer).
hazardRecord = &record{}
) )
func (i *instrument) Meter() api.Meter { func (i *instrument) Meter() api.Meter {
@ -186,31 +142,48 @@ func (i *instrument) acquireHandle(ls *labels) *record {
if actual, ok := i.meter.current.Load(mk); ok { if actual, ok := i.meter.current.Load(mk); ok {
// Existing record case, only one allocation so far. // Existing record case, only one allocation so far.
rec := actual.(*record) rec := actual.(*record)
atomic.AddInt64(&rec.refcount, 1) if rec.refMapped.ref() {
// At this moment it is guaranteed that the entry is in
// the map and will not be removed.
return rec return rec
} }
// This entry is no longer mapped, try to add a new entry.
}
// There's a memory allocation here. // There's a memory allocation here.
rec := &record{ rec := &record{
labels: ls, labels: ls,
descriptor: i.descriptor, descriptor: i.descriptor,
refcount: 1, refMapped: refcountMapped{value: 2},
collectedEpoch: -1, modified: 0,
modifiedEpoch: 0,
recorder: i.meter.batcher.AggregatorFor(i.descriptor), recorder: i.meter.batcher.AggregatorFor(i.descriptor),
} }
for {
// Load/Store: there's a memory allocation to place `mk` into // Load/Store: there's a memory allocation to place `mk` into
// an interface here. // an interface here.
if actual, loaded := i.meter.current.LoadOrStore(mk, rec); loaded { if actual, loaded := i.meter.current.LoadOrStore(mk, rec); loaded {
// Existing record case. // Existing record case. Cannot change rec here because if fail
rec = actual.(*record) // will try to add rec again to avoid new allocations.
atomic.AddInt64(&rec.refcount, 1) oldRec := actual.(*record)
if oldRec.refMapped.ref() {
// At this moment it is guaranteed that the entry is in
// the map and will not be removed.
return oldRec
}
// This loaded entry is marked as unmapped (so Collect will remove
// it from the map immediately), try again - this is a busy waiting
// strategy to wait until Collect() removes this entry from the map.
//
// This can be improved by having a list of "Unmapped" entries for
// one time only usages, OR we can make this a blocking path and use
// a Mutex that protects the delete operation (delete only if the old
// record is associated with the key).
continue
}
// The new entry was added to the map, good to go.
return rec return rec
} }
i.meter.addPrimary(rec)
return rec
} }
func (i *instrument) Bind(ls api.LabelSet) api.BoundInstrumentImpl { func (i *instrument) Bind(ls api.LabelSet) api.BoundInstrumentImpl {
@ -355,22 +328,6 @@ func (m *SDK) NewFloat64Measure(name string, mos ...api.MeasureOptionApplier) ap
return api.WrapFloat64MeasureInstrument(m.newMeasureInstrument(name, core.Float64NumberKind, mos...)) return api.WrapFloat64MeasureInstrument(m.newMeasureInstrument(name, core.Float64NumberKind, mos...))
} }
// saveFromReclaim puts a record onto the "reclaim" list when it
// detects an attempt to delete the record while it is still in use.
func (m *SDK) saveFromReclaim(rec *record) {
for {
reclaimed := atomic.LoadInt64(&rec.reclaim)
if reclaimed != 0 {
return
}
if atomic.CompareAndSwapInt64(&rec.reclaim, 0, 1) {
break
}
}
m.addReclaim(rec)
}
// Collect traverses the list of active records and exports data for // Collect traverses the list of active records and exports data for
// each active instrument. Collect() may not be called concurrently. // each active instrument. Collect() may not be called concurrently.
// //
@ -384,46 +341,26 @@ func (m *SDK) Collect(ctx context.Context) int {
checkpointed := 0 checkpointed := 0
var next *record m.current.Range(func(key interface{}, value interface{}) bool {
for inuse := m.records.primary.swapNil(); inuse != nil; inuse = next { inuse := value.(*record)
next = inuse.next.primary.load() unmapped := inuse.refMapped.tryUnmap()
// If able to unmap then remove the record from the current Map.
refcount := atomic.LoadInt64(&inuse.refcount) if unmapped {
if refcount > 0 {
checkpointed += m.checkpoint(ctx, inuse)
m.addPrimary(inuse)
continue
}
modified := atomic.LoadInt64(&inuse.modifiedEpoch)
collected := atomic.LoadInt64(&inuse.collectedEpoch)
checkpointed += m.checkpoint(ctx, inuse)
if modified >= collected {
atomic.StoreInt64(&inuse.collectedEpoch, m.currentEpoch)
m.addPrimary(inuse)
continue
}
// Remove this entry.
m.current.Delete(inuse.mapkey()) m.current.Delete(inuse.mapkey())
inuse.next.primary.store(hazardRecord)
} }
for chances := m.records.reclaim.swapNil(); chances != nil; chances = next { // Always report the values if a reference to the Record is active,
atomic.StoreInt64(&chances.collectedEpoch, m.currentEpoch) // this is to keep the previous behavior.
// TODO: Reconsider this logic.
next = chances.next.reclaim.load() if inuse.refMapped.inUse() || atomic.LoadInt64(&inuse.modified) != 0 {
chances.next.reclaim.clear() atomic.StoreInt64(&inuse.modified, 0)
atomic.StoreInt64(&chances.reclaim, 0) checkpointed += m.checkpoint(ctx, inuse)
if chances.next.primary.load() == hazardRecord {
checkpointed += m.checkpoint(ctx, chances)
m.addPrimary(chances)
}
} }
// Always continue to iterate over the entire map.
return true
})
m.currentEpoch++ m.currentEpoch++
return checkpointed return checkpointed
} }
@ -474,29 +411,11 @@ func (r *record) RecordOne(ctx context.Context, number core.Number) {
} }
func (r *record) Unbind() { func (r *record) Unbind() {
for { // Record was modified, inform the Collect() that things need to be collected.
collected := atomic.LoadInt64(&r.collectedEpoch) // TODO: Reconsider if we should marked as modified when an Update happens and
modified := atomic.LoadInt64(&r.modifiedEpoch) // collect only when updates happened even for Bounds.
atomic.StoreInt64(&r.modified, 1)
updated := collected + 1 r.refMapped.unref()
if modified == updated {
// No change
break
}
if !atomic.CompareAndSwapInt64(&r.modifiedEpoch, modified, updated) {
continue
}
if modified < collected {
// This record could have been reclaimed.
r.labels.meter.saveFromReclaim(r)
}
break
}
_ = atomic.AddInt64(&r.refcount, -1)
} }
func (r *record) mapkey() mapkey { func (r *record) mapkey() mapkey {