mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-20 03:30:02 +02:00
9878f3b700
* Add MetricAggregator.Merge() implementations * Update from feedback * Type * Ckpt * Ckpt * Add push controller * Ckpt * Add aggregator interfaces, stdout encoder * Modify basic main.go * Main is working * Batch stdout output * Sum udpate * Rename stdout * Add stateless/stateful Batcher options * Undo a for-loop in the example, remove a done TODO * Update imports * Add note * Rename defaultkeys * Support variable label encoder to speed OpenMetrics/Statsd export * Lint * Doc * Precommit/lint * Simplify Aggregator API * Record->Identifier * Remove export.Record a.k.a. Identifier * Checkpoint * Propagate errors to the SDK, remove a bunch of 'TODO warn' * Checkpoint * Introduce export.Labels * Comments in export/metric.go * Comment * More merge * More doc * Complete example * Lint fixes * Add a testable example * Lint * Let Export return an error * add a basic stdout exporter test * Add measure test; fix aggregator APIs * Use JSON numbers, not strings * Test stdout exporter error * Add a test for the call to RangeTest * Add error handler API to improve correctness test; return errors from RecordOne * Undo the previous -- do not expose errors * Add simple selector variations, test * Repair examples * Test push controller error handling * Add SDK label encoder tests * Add a defaultkeys batcher test * Add an ungrouped batcher test * Lint new tests * Respond to krnowak's feedback * Undo comment * Use concrete receivers for export records and labels, since the constructors return structs not pointers * Bug fix for stateful batchers; clone an aggregator for long term storage * Remove TODO addressed in #318 * Add errors to all aggregator interfaces * Handle ErrNoLastValue case in stdout exporter * Move aggregator API into sdk/export/metric/aggregator * Update all aggregator exported-method comments * Document the aggregator APIs * More aggregator comments * Add multiple updates to the ungrouped test * Fixes for feedback from Gustavo and Liz * Producer->CheckpointSet; add FinishedCollection * Process takes an export.Record * ReadCheckpoint->CheckpointSet * EncodeLabels->Encode * Format a better inconsistent type error; add more aggregator API tests * More RangeTest test coverage * Make benbjohnson/clock a test-only dependency * Handle ErrNoLastValue in stress_test
498 lines
14 KiB
Go
498 lines
14 KiB
Go
// Copyright 2019, OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package metric
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"unsafe"
|
|
|
|
"go.opentelemetry.io/otel/api/core"
|
|
"go.opentelemetry.io/otel/api/metric"
|
|
api "go.opentelemetry.io/otel/api/metric"
|
|
export "go.opentelemetry.io/otel/sdk/export/metric"
|
|
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
|
|
)
|
|
|
|
type (
|
|
// SDK implements the OpenTelemetry Meter API. The SDK is
|
|
// bound to a single export.Batcher in `New()`.
|
|
//
|
|
// The SDK supports a Collect() API to gather and export
|
|
// current data. Collect() should be arranged according to
|
|
// the batcher model. Push-based batchers will setup a
|
|
// timer to call Collect() periodically. Pull-based batchers
|
|
// will call Collect() when a pull request arrives.
|
|
SDK struct {
|
|
// current maps `mapkey` to *record.
|
|
current sync.Map
|
|
|
|
// empty is the (singleton) result of Labels()
|
|
// w/ zero arguments.
|
|
empty labels
|
|
|
|
// records is the head of both the primary and the
|
|
// reclaim records lists.
|
|
records doublePtr
|
|
|
|
// currentEpoch is the current epoch number. It is
|
|
// incremented in `Collect()`.
|
|
currentEpoch int64
|
|
|
|
// batcher is the configured batcher+configuration.
|
|
batcher export.Batcher
|
|
|
|
// lencoder determines how labels are uniquely encoded.
|
|
labelEncoder export.LabelEncoder
|
|
|
|
// collectLock prevents simultaneous calls to Collect().
|
|
collectLock sync.Mutex
|
|
|
|
// errorHandler supports delivering errors to the user.
|
|
errorHandler ErrorHandler
|
|
}
|
|
|
|
instrument struct {
|
|
descriptor *export.Descriptor
|
|
meter *SDK
|
|
}
|
|
|
|
// sortedLabels are used to de-duplicate and canonicalize labels.
|
|
sortedLabels []core.KeyValue
|
|
|
|
// labels implements the OpenTelemetry LabelSet API,
|
|
// represents an internalized set of labels that may be used
|
|
// repeatedly.
|
|
labels struct {
|
|
meter *SDK
|
|
sorted sortedLabels
|
|
encoded string
|
|
}
|
|
|
|
// mapkey uniquely describes a metric instrument in terms of
|
|
// its InstrumentID and the encoded form of its LabelSet.
|
|
mapkey struct {
|
|
descriptor *export.Descriptor
|
|
encoded string
|
|
}
|
|
|
|
// record maintains the state of one metric instrument. Due
|
|
// the use of lock-free algorithms, there may be more than one
|
|
// `record` in existence at a time, although at most one can
|
|
// be referenced from the `SDK.current` map.
|
|
record struct {
|
|
// labels is the LabelSet passed by the user.
|
|
labels *labels
|
|
|
|
// descriptor describes the metric instrument.
|
|
descriptor *export.Descriptor
|
|
|
|
// refcount counts the number of active handles on
|
|
// referring to this record. active handles prevent
|
|
// removing the record from the current map.
|
|
refcount int64
|
|
|
|
// collectedEpoch is the epoch number for which this
|
|
// record has been exported. This is modified by the
|
|
// `Collect()` method.
|
|
collectedEpoch int64
|
|
|
|
// modifiedEpoch is the latest epoch number for which
|
|
// this record was updated. Generally, if
|
|
// modifiedEpoch is less than collectedEpoch, this
|
|
// record is due for reclaimation.
|
|
modifiedEpoch int64
|
|
|
|
// reclaim is an atomic to control the start of reclaiming.
|
|
reclaim int64
|
|
|
|
// recorder implements the actual RecordOne() API,
|
|
// depending on the type of aggregation. If nil, the
|
|
// metric was disabled by the exporter.
|
|
recorder export.Aggregator
|
|
|
|
// next contains the next pointer for both the primary
|
|
// and the reclaim lists.
|
|
next doublePtr
|
|
}
|
|
|
|
ErrorHandler func(error)
|
|
|
|
// singlePointer wraps an unsafe.Pointer and supports basic
|
|
// load(), store(), clear(), and swapNil() operations.
|
|
singlePtr struct {
|
|
ptr unsafe.Pointer
|
|
}
|
|
|
|
// doublePtr is used for the head and next links of two lists.
|
|
doublePtr struct {
|
|
primary singlePtr
|
|
reclaim singlePtr
|
|
}
|
|
)
|
|
|
|
var (
|
|
_ api.Meter = &SDK{}
|
|
_ api.LabelSet = &labels{}
|
|
_ api.InstrumentImpl = &instrument{}
|
|
_ api.HandleImpl = &record{}
|
|
|
|
// hazardRecord is used as a pointer value that indicates the
|
|
// value is not included in any list. (`nil` would be
|
|
// ambiguous, since the final element in a list has `nil` as
|
|
// the next pointer).
|
|
hazardRecord = &record{}
|
|
)
|
|
|
|
func (i *instrument) Meter() api.Meter {
|
|
return i.meter
|
|
}
|
|
|
|
func (m *SDK) SetErrorHandler(f ErrorHandler) {
|
|
m.errorHandler = f
|
|
}
|
|
|
|
func (i *instrument) acquireHandle(ls *labels) *record {
|
|
// Create lookup key for sync.Map (one allocation)
|
|
mk := mapkey{
|
|
descriptor: i.descriptor,
|
|
encoded: ls.encoded,
|
|
}
|
|
|
|
if actual, ok := i.meter.current.Load(mk); ok {
|
|
// Existing record case, only one allocation so far.
|
|
rec := actual.(*record)
|
|
atomic.AddInt64(&rec.refcount, 1)
|
|
return rec
|
|
}
|
|
|
|
// There's a memory allocation here.
|
|
rec := &record{
|
|
labels: ls,
|
|
descriptor: i.descriptor,
|
|
refcount: 1,
|
|
collectedEpoch: -1,
|
|
modifiedEpoch: 0,
|
|
recorder: i.meter.batcher.AggregatorFor(i.descriptor),
|
|
}
|
|
|
|
// Load/Store: there's a memory allocation to place `mk` into
|
|
// an interface here.
|
|
if actual, loaded := i.meter.current.LoadOrStore(mk, rec); loaded {
|
|
// Existing record case.
|
|
rec = actual.(*record)
|
|
atomic.AddInt64(&rec.refcount, 1)
|
|
return rec
|
|
}
|
|
|
|
i.meter.addPrimary(rec)
|
|
return rec
|
|
}
|
|
|
|
func (i *instrument) AcquireHandle(ls api.LabelSet) api.HandleImpl {
|
|
labs := i.meter.labsFor(ls)
|
|
return i.acquireHandle(labs)
|
|
}
|
|
|
|
func (i *instrument) RecordOne(ctx context.Context, number core.Number, ls api.LabelSet) {
|
|
ourLs := i.meter.labsFor(ls)
|
|
h := i.acquireHandle(ourLs)
|
|
defer h.Release()
|
|
h.RecordOne(ctx, number)
|
|
}
|
|
|
|
// New constructs a new SDK for the given batcher. This SDK supports
|
|
// only a single batcher.
|
|
//
|
|
// The SDK does not start any background process to collect itself
|
|
// periodically, this responsbility lies with the batcher, typically,
|
|
// depending on the type of export. For example, a pull-based
|
|
// batcher will call Collect() when it receives a request to scrape
|
|
// current metric values. A push-based batcher should configure its
|
|
// own periodic collection.
|
|
func New(batcher export.Batcher, labelEncoder export.LabelEncoder) *SDK {
|
|
m := &SDK{
|
|
batcher: batcher,
|
|
labelEncoder: labelEncoder,
|
|
errorHandler: DefaultErrorHandler,
|
|
}
|
|
m.empty.meter = m
|
|
return m
|
|
}
|
|
|
|
func DefaultErrorHandler(err error) {
|
|
fmt.Fprintln(os.Stderr, "Metrics SDK error:", err)
|
|
}
|
|
|
|
// Labels returns a LabelSet corresponding to the arguments. Passed
|
|
// labels are de-duplicated, with last-value-wins semantics.
|
|
func (m *SDK) Labels(kvs ...core.KeyValue) api.LabelSet {
|
|
// Note: This computes a canonical encoding of the labels to
|
|
// use as a map key. It happens to use the encoding used by
|
|
// statsd for labels, allowing an optimization for statsd
|
|
// batchers. This could be made configurable in the
|
|
// constructor, to support the same optimization for different
|
|
// batchers.
|
|
|
|
// Check for empty set.
|
|
if len(kvs) == 0 {
|
|
return &m.empty
|
|
}
|
|
|
|
ls := &labels{
|
|
meter: m,
|
|
sorted: kvs,
|
|
}
|
|
|
|
// Sort and de-duplicate.
|
|
sort.Stable(&ls.sorted)
|
|
oi := 1
|
|
for i := 1; i < len(ls.sorted); i++ {
|
|
if ls.sorted[i-1].Key == ls.sorted[i].Key {
|
|
ls.sorted[oi-1] = ls.sorted[i]
|
|
continue
|
|
}
|
|
ls.sorted[oi] = ls.sorted[i]
|
|
oi++
|
|
}
|
|
ls.sorted = ls.sorted[0:oi]
|
|
|
|
ls.encoded = m.labelEncoder.Encode(ls.sorted)
|
|
|
|
return ls
|
|
}
|
|
|
|
// labsFor sanitizes the input LabelSet. The input will be rejected
|
|
// if it was created by another Meter instance, for example.
|
|
func (m *SDK) labsFor(ls api.LabelSet) *labels {
|
|
if l, _ := ls.(*labels); l != nil && l.meter == m {
|
|
return l
|
|
}
|
|
return &m.empty
|
|
}
|
|
|
|
func (m *SDK) newInstrument(name string, metricKind export.MetricKind, numberKind core.NumberKind, opts *api.Options) *instrument {
|
|
descriptor := export.NewDescriptor(
|
|
name,
|
|
metricKind,
|
|
opts.Keys,
|
|
opts.Description,
|
|
opts.Unit,
|
|
numberKind,
|
|
opts.Alternate)
|
|
return &instrument{
|
|
descriptor: descriptor,
|
|
meter: m,
|
|
}
|
|
}
|
|
|
|
func (m *SDK) newCounterInstrument(name string, numberKind core.NumberKind, cos ...api.CounterOptionApplier) *instrument {
|
|
opts := api.Options{}
|
|
api.ApplyCounterOptions(&opts, cos...)
|
|
return m.newInstrument(name, export.CounterKind, numberKind, &opts)
|
|
}
|
|
|
|
func (m *SDK) newGaugeInstrument(name string, numberKind core.NumberKind, gos ...api.GaugeOptionApplier) *instrument {
|
|
opts := api.Options{}
|
|
api.ApplyGaugeOptions(&opts, gos...)
|
|
return m.newInstrument(name, export.GaugeKind, numberKind, &opts)
|
|
}
|
|
|
|
func (m *SDK) newMeasureInstrument(name string, numberKind core.NumberKind, mos ...api.MeasureOptionApplier) *instrument {
|
|
opts := api.Options{}
|
|
api.ApplyMeasureOptions(&opts, mos...)
|
|
return m.newInstrument(name, export.MeasureKind, numberKind, &opts)
|
|
}
|
|
|
|
func (m *SDK) NewInt64Counter(name string, cos ...api.CounterOptionApplier) api.Int64Counter {
|
|
return api.WrapInt64CounterInstrument(m.newCounterInstrument(name, core.Int64NumberKind, cos...))
|
|
}
|
|
|
|
func (m *SDK) NewFloat64Counter(name string, cos ...api.CounterOptionApplier) api.Float64Counter {
|
|
return api.WrapFloat64CounterInstrument(m.newCounterInstrument(name, core.Float64NumberKind, cos...))
|
|
}
|
|
|
|
func (m *SDK) NewInt64Gauge(name string, gos ...api.GaugeOptionApplier) api.Int64Gauge {
|
|
return api.WrapInt64GaugeInstrument(m.newGaugeInstrument(name, core.Int64NumberKind, gos...))
|
|
}
|
|
|
|
func (m *SDK) NewFloat64Gauge(name string, gos ...api.GaugeOptionApplier) api.Float64Gauge {
|
|
return api.WrapFloat64GaugeInstrument(m.newGaugeInstrument(name, core.Float64NumberKind, gos...))
|
|
}
|
|
|
|
func (m *SDK) NewInt64Measure(name string, mos ...api.MeasureOptionApplier) api.Int64Measure {
|
|
return api.WrapInt64MeasureInstrument(m.newMeasureInstrument(name, core.Int64NumberKind, mos...))
|
|
}
|
|
|
|
func (m *SDK) NewFloat64Measure(name string, mos ...api.MeasureOptionApplier) api.Float64Measure {
|
|
return api.WrapFloat64MeasureInstrument(m.newMeasureInstrument(name, core.Float64NumberKind, mos...))
|
|
}
|
|
|
|
// saveFromReclaim puts a record onto the "reclaim" list when it
|
|
// detects an attempt to delete the record while it is still in use.
|
|
func (m *SDK) saveFromReclaim(rec *record) {
|
|
for {
|
|
|
|
reclaimed := atomic.LoadInt64(&rec.reclaim)
|
|
if reclaimed != 0 {
|
|
return
|
|
}
|
|
if atomic.CompareAndSwapInt64(&rec.reclaim, 0, 1) {
|
|
break
|
|
}
|
|
}
|
|
|
|
m.addReclaim(rec)
|
|
}
|
|
|
|
// Collect traverses the list of active records and exports data for
|
|
// each active instrument. Collect() may not be called concurrently.
|
|
//
|
|
// During the collection pass, the export.Batcher will receive
|
|
// one Export() call per current aggregation.
|
|
//
|
|
// Returns the number of records that were checkpointed.
|
|
func (m *SDK) Collect(ctx context.Context) int {
|
|
m.collectLock.Lock()
|
|
defer m.collectLock.Unlock()
|
|
|
|
checkpointed := 0
|
|
|
|
var next *record
|
|
for inuse := m.records.primary.swapNil(); inuse != nil; inuse = next {
|
|
next = inuse.next.primary.load()
|
|
|
|
refcount := atomic.LoadInt64(&inuse.refcount)
|
|
|
|
if refcount > 0 {
|
|
checkpointed += m.checkpoint(ctx, inuse)
|
|
m.addPrimary(inuse)
|
|
continue
|
|
}
|
|
|
|
modified := atomic.LoadInt64(&inuse.modifiedEpoch)
|
|
collected := atomic.LoadInt64(&inuse.collectedEpoch)
|
|
checkpointed += m.checkpoint(ctx, inuse)
|
|
|
|
if modified >= collected {
|
|
atomic.StoreInt64(&inuse.collectedEpoch, m.currentEpoch)
|
|
m.addPrimary(inuse)
|
|
continue
|
|
}
|
|
|
|
// Remove this entry.
|
|
m.current.Delete(inuse.mapkey())
|
|
inuse.next.primary.store(hazardRecord)
|
|
}
|
|
|
|
for chances := m.records.reclaim.swapNil(); chances != nil; chances = next {
|
|
atomic.StoreInt64(&chances.collectedEpoch, m.currentEpoch)
|
|
|
|
next = chances.next.reclaim.load()
|
|
chances.next.reclaim.clear()
|
|
atomic.StoreInt64(&chances.reclaim, 0)
|
|
|
|
if chances.next.primary.load() == hazardRecord {
|
|
checkpointed += m.checkpoint(ctx, chances)
|
|
m.addPrimary(chances)
|
|
}
|
|
}
|
|
|
|
m.currentEpoch++
|
|
return checkpointed
|
|
}
|
|
|
|
func (m *SDK) checkpoint(ctx context.Context, r *record) int {
|
|
if r.recorder == nil {
|
|
return 0
|
|
}
|
|
r.recorder.Checkpoint(ctx, r.descriptor)
|
|
labels := export.NewLabels(r.labels.sorted, r.labels.encoded, m.labelEncoder)
|
|
err := m.batcher.Process(ctx, export.NewRecord(r.descriptor, labels, r.recorder))
|
|
|
|
if err != nil {
|
|
m.errorHandler(err)
|
|
}
|
|
return 1
|
|
}
|
|
|
|
// RecordBatch enters a batch of metric events.
|
|
func (m *SDK) RecordBatch(ctx context.Context, ls api.LabelSet, measurements ...api.Measurement) {
|
|
for _, meas := range measurements {
|
|
meas.InstrumentImpl().RecordOne(ctx, meas.Number(), ls)
|
|
}
|
|
}
|
|
|
|
// GetDescriptor returns the descriptor of an instrument, which is not
|
|
// part of the public metric API.
|
|
func (m *SDK) GetDescriptor(inst metric.InstrumentImpl) *export.Descriptor {
|
|
if ii, ok := inst.(*instrument); ok {
|
|
return ii.descriptor
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *record) RecordOne(ctx context.Context, number core.Number) {
|
|
if r.recorder == nil {
|
|
// The instrument is disabled according to the AggregationSelector.
|
|
return
|
|
}
|
|
if err := aggregator.RangeTest(number, r.descriptor); err != nil {
|
|
r.labels.meter.errorHandler(err)
|
|
return
|
|
}
|
|
if err := r.recorder.Update(ctx, number, r.descriptor); err != nil {
|
|
r.labels.meter.errorHandler(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (r *record) Release() {
|
|
for {
|
|
collected := atomic.LoadInt64(&r.collectedEpoch)
|
|
modified := atomic.LoadInt64(&r.modifiedEpoch)
|
|
|
|
updated := collected + 1
|
|
|
|
if modified == updated {
|
|
// No change
|
|
break
|
|
}
|
|
if !atomic.CompareAndSwapInt64(&r.modifiedEpoch, modified, updated) {
|
|
continue
|
|
}
|
|
|
|
if modified < collected {
|
|
// This record could have been reclaimed.
|
|
r.labels.meter.saveFromReclaim(r)
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
_ = atomic.AddInt64(&r.refcount, -1)
|
|
}
|
|
|
|
func (r *record) mapkey() mapkey {
|
|
return mapkey{
|
|
descriptor: r.descriptor,
|
|
encoded: r.labels.encoded,
|
|
}
|
|
}
|