You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-07-17 01:12:45 +02:00
Support cumulative, delta, and pass-through exporters (#840)
* Update Process() * Checkpoint * Add subtractor; fix test * Fix all simple integrator tests * Build the rest (checkpoint) * Pass all but Prometheus tests * Precommit pass * Add aggregation.Kind argument to ExportKindFor * Remove Subtractor support * Remove dead test code * Restore the Subtractor code * Fix the tests * Comments * Add tests for MetricKind * Add ChangeSign test * Test ExportKind * New file * Rename ChangeSign * Remove a TODO, add a TODO * Remove Stateful remnants * Typo * Typo * Test an invalid export kind * Comments * Lint * Apply suggestions from code review Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
@ -35,3 +35,45 @@ const (
|
||||
// UpDownSumObserverKind indicates a UpDownSumObserver instrument.
|
||||
UpDownSumObserverKind
|
||||
)
|
||||
|
||||
// Synchronous returns whether this is a synchronous kind of instrument.
|
||||
func (k Kind) Synchronous() bool {
|
||||
switch k {
|
||||
case CounterKind, UpDownCounterKind, ValueRecorderKind:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Asynchronous returns whether this is an asynchronous kind of instrument.
|
||||
func (k Kind) Asynchronous() bool {
|
||||
return !k.Synchronous()
|
||||
}
|
||||
|
||||
// Adding returns whether this kind of instrument adds its inputs (as opposed to Grouping).
|
||||
func (k Kind) Adding() bool {
|
||||
switch k {
|
||||
case CounterKind, UpDownCounterKind, SumObserverKind, UpDownSumObserverKind:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Adding returns whether this kind of instrument groups its inputs (as opposed to Adding).
|
||||
func (k Kind) Grouping() bool {
|
||||
return !k.Adding()
|
||||
}
|
||||
|
||||
// Monotonic returns whether this kind of instrument exposes a non-decreasing sum.
|
||||
func (k Kind) Monotonic() bool {
|
||||
switch k {
|
||||
case CounterKind, SumObserverKind:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Cumulative returns whether this kind of instrument receives precomputed sums.
|
||||
func (k Kind) PrecomputedSum() bool {
|
||||
return k.Adding() && k.Asynchronous()
|
||||
}
|
||||
|
110
api/metric/kind_test.go
Normal file
110
api/metric/kind_test.go
Normal file
@ -0,0 +1,110 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
)
|
||||
|
||||
var (
|
||||
syncKinds = []metric.Kind{
|
||||
metric.ValueRecorderKind,
|
||||
metric.CounterKind,
|
||||
metric.UpDownCounterKind,
|
||||
}
|
||||
asyncKinds = []metric.Kind{
|
||||
metric.ValueObserverKind,
|
||||
metric.SumObserverKind,
|
||||
metric.UpDownSumObserverKind,
|
||||
}
|
||||
addingKinds = []metric.Kind{
|
||||
metric.CounterKind,
|
||||
metric.UpDownCounterKind,
|
||||
metric.SumObserverKind,
|
||||
metric.UpDownSumObserverKind,
|
||||
}
|
||||
groupingKinds = []metric.Kind{
|
||||
metric.ValueRecorderKind,
|
||||
metric.ValueObserverKind,
|
||||
}
|
||||
|
||||
monotonicKinds = []metric.Kind{
|
||||
metric.CounterKind,
|
||||
metric.SumObserverKind,
|
||||
}
|
||||
|
||||
nonMonotonicKinds = []metric.Kind{
|
||||
metric.UpDownCounterKind,
|
||||
metric.UpDownSumObserverKind,
|
||||
metric.ValueRecorderKind,
|
||||
metric.ValueObserverKind,
|
||||
}
|
||||
|
||||
precomputedSumKinds = []metric.Kind{
|
||||
metric.SumObserverKind,
|
||||
metric.UpDownSumObserverKind,
|
||||
}
|
||||
|
||||
nonPrecomputedSumKinds = []metric.Kind{
|
||||
metric.CounterKind,
|
||||
metric.UpDownCounterKind,
|
||||
metric.ValueRecorderKind,
|
||||
metric.ValueObserverKind,
|
||||
}
|
||||
)
|
||||
|
||||
func TestSynchronous(t *testing.T) {
|
||||
for _, k := range syncKinds {
|
||||
require.True(t, k.Synchronous())
|
||||
require.False(t, k.Asynchronous())
|
||||
}
|
||||
for _, k := range asyncKinds {
|
||||
require.True(t, k.Asynchronous())
|
||||
require.False(t, k.Synchronous())
|
||||
}
|
||||
}
|
||||
|
||||
func TestGrouping(t *testing.T) {
|
||||
for _, k := range groupingKinds {
|
||||
require.True(t, k.Grouping())
|
||||
require.False(t, k.Adding())
|
||||
}
|
||||
for _, k := range addingKinds {
|
||||
require.True(t, k.Adding())
|
||||
require.False(t, k.Grouping())
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonotonic(t *testing.T) {
|
||||
for _, k := range monotonicKinds {
|
||||
require.True(t, k.Monotonic())
|
||||
}
|
||||
for _, k := range nonMonotonicKinds {
|
||||
require.False(t, k.Monotonic())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrecomputedSum(t *testing.T) {
|
||||
for _, k := range precomputedSumKinds {
|
||||
require.True(t, k.PrecomputedSum())
|
||||
}
|
||||
for _, k := range nonPrecomputedSumKinds {
|
||||
require.False(t, k.PrecomputedSum())
|
||||
}
|
||||
}
|
@ -33,6 +33,7 @@ const (
|
||||
// Float64NumberKind means that the Number stores float64.
|
||||
Float64NumberKind
|
||||
// Uint64NumberKind means that the Number stores uint64.
|
||||
// TODO: This can be removed, it's not used.
|
||||
Uint64NumberKind
|
||||
)
|
||||
|
||||
@ -107,6 +108,20 @@ func NewUint64Number(u uint64) Number {
|
||||
return NewNumberFromRaw(internal.Uint64ToRaw(u))
|
||||
}
|
||||
|
||||
// NewNumberSignChange returns a number with the same magnitude and
|
||||
// the opposite sign. `kind` must describe the kind of number in `nn`.
|
||||
//
|
||||
// Does not change Uint64NumberKind values.
|
||||
func NewNumberSignChange(kind NumberKind, nn Number) Number {
|
||||
switch kind {
|
||||
case Int64NumberKind:
|
||||
return NewInt64Number(-nn.AsInt64())
|
||||
case Float64NumberKind:
|
||||
return NewFloat64Number(-nn.AsFloat64())
|
||||
}
|
||||
return nn
|
||||
}
|
||||
|
||||
// - as x
|
||||
|
||||
// AsNumber gets the Number.
|
||||
|
@ -15,6 +15,7 @@
|
||||
package metric
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"unsafe"
|
||||
|
||||
@ -170,3 +171,45 @@ func TestNumberAsInterface(t *testing.T) {
|
||||
require.Equal(t, 11.11, (&f64).AsInterface(Float64NumberKind).(float64))
|
||||
require.Equal(t, uint64(100), (&u64).AsInterface(Uint64NumberKind).(uint64))
|
||||
}
|
||||
|
||||
func TestNumberSignChange(t *testing.T) {
|
||||
t.Run("Int64", func(t *testing.T) {
|
||||
posInt := NewInt64Number(10)
|
||||
negInt := NewInt64Number(-10)
|
||||
|
||||
require.Equal(t, posInt, NewNumberSignChange(Int64NumberKind, negInt))
|
||||
require.Equal(t, negInt, NewNumberSignChange(Int64NumberKind, posInt))
|
||||
})
|
||||
|
||||
t.Run("Float64", func(t *testing.T) {
|
||||
posFloat := NewFloat64Number(10)
|
||||
negFloat := NewFloat64Number(-10)
|
||||
|
||||
require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat))
|
||||
require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat))
|
||||
})
|
||||
|
||||
t.Run("Float64Zero", func(t *testing.T) {
|
||||
posFloat := NewFloat64Number(0)
|
||||
negFloat := NewFloat64Number(math.Copysign(0, -1))
|
||||
|
||||
require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat))
|
||||
require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat))
|
||||
})
|
||||
|
||||
t.Run("Float64Inf", func(t *testing.T) {
|
||||
posFloat := NewFloat64Number(math.Inf(+1))
|
||||
negFloat := NewFloat64Number(math.Inf(-1))
|
||||
|
||||
require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat))
|
||||
require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat))
|
||||
})
|
||||
|
||||
t.Run("Float64NaN", func(t *testing.T) {
|
||||
posFloat := NewFloat64Number(math.NaN())
|
||||
negFloat := NewFloat64Number(math.Copysign(math.NaN(), -1))
|
||||
|
||||
require.Equal(t, posFloat, NewNumberSignChange(Float64NumberKind, negFloat))
|
||||
require.Equal(t, negFloat, NewNumberSignChange(Float64NumberKind, posFloat))
|
||||
})
|
||||
}
|
||||
|
@ -50,7 +50,6 @@ func initProvider() (*otlp.Exporter, *push.Controller) {
|
||||
pusher := push.New(
|
||||
simple.NewWithExactDistribution(),
|
||||
exp,
|
||||
push.WithStateful(true),
|
||||
push.WithPeriod(2*time.Second),
|
||||
)
|
||||
|
||||
|
@ -32,11 +32,9 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
)
|
||||
|
||||
// Exporter is an implementation of metric.Exporter that sends metrics to
|
||||
// Prometheus.
|
||||
//
|
||||
// This exporter supports Prometheus pulls, as such it does not
|
||||
// implement the export.Exporter interface.
|
||||
// Exporter supports Prometheus pulls. It does not implement the
|
||||
// sdk/export/metric.Exporter interface--instead it creates a pull
|
||||
// controller and reads the latest checkpointed data on-scrape.
|
||||
type Exporter struct {
|
||||
handler http.Handler
|
||||
|
||||
@ -144,20 +142,11 @@ func InstallNewPipeline(config Config, options ...pull.Option) (*Exporter, error
|
||||
func (e *Exporter) SetController(config Config, options ...pull.Option) {
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
// Prometheus uses a stateful pull controller since instruments are
|
||||
// cumulative and should not be reset after each collection interval.
|
||||
//
|
||||
// Prometheus uses this approach to be resilient to scrape failures.
|
||||
// If a Prometheus server tries to scrape metrics from a host and fails for some reason,
|
||||
// it could try again on the next scrape and no data would be lost, only resolution.
|
||||
//
|
||||
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
|
||||
//
|
||||
// TODO: Prometheus supports "Gauge Histogram" which are
|
||||
// expressed as delta histograms.
|
||||
|
||||
e.controller = pull.New(
|
||||
simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
|
||||
append(options, pull.WithStateful(true))...,
|
||||
e,
|
||||
options...,
|
||||
)
|
||||
}
|
||||
|
||||
@ -173,6 +162,15 @@ func (e *Exporter) Controller() *pull.Controller {
|
||||
return e.controller
|
||||
}
|
||||
|
||||
func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind {
|
||||
// NOTE: Summary values should use Delta aggregation, then be
|
||||
// combined into a sliding window, see the TODO below.
|
||||
// NOTE: Prometheus also supports a "GaugeDelta" exposition format,
|
||||
// which is expressed as a delta histogram. Need to understand if this
|
||||
// should be a default behavior for ValueRecorder/ValueObserver.
|
||||
return export.CumulativeExporter
|
||||
}
|
||||
|
||||
func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
e.handler.ServeHTTP(w, r)
|
||||
}
|
||||
@ -188,7 +186,7 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
|
||||
c.exp.lock.RLock()
|
||||
defer c.exp.lock.RUnlock()
|
||||
|
||||
_ = c.exp.Controller().ForEach(func(record export.Record) error {
|
||||
_ = c.exp.Controller().ForEach(c.exp, func(record export.Record) error {
|
||||
var labelKeys []string
|
||||
mergeLabels(record, &labelKeys, nil)
|
||||
ch <- c.toDesc(record, labelKeys)
|
||||
@ -209,7 +207,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
global.Handle(err)
|
||||
}
|
||||
|
||||
err := ctrl.ForEach(func(record export.Record) error {
|
||||
err := ctrl.ForEach(c.exp, func(record export.Record) error {
|
||||
agg := record.Aggregation()
|
||||
numberKind := record.Descriptor().NumberKind()
|
||||
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/global"
|
||||
"go.opentelemetry.io/otel/api/kv"
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||
@ -132,9 +132,6 @@ func InstallNewPipeline(config Config, options ...push.Option) (*push.Controller
|
||||
// NewExportPipeline sets up a complete export pipeline with the
|
||||
// recommended setup, chaining a NewRawExporter into the recommended
|
||||
// selectors and integrators.
|
||||
//
|
||||
// The pipeline is configured with a stateful integrator unless the
|
||||
// push.WithStateful(false) option is used.
|
||||
func NewExportPipeline(config Config, options ...push.Option) (*push.Controller, error) {
|
||||
exporter, err := NewRawExporter(config)
|
||||
if err != nil {
|
||||
@ -143,13 +140,17 @@ func NewExportPipeline(config Config, options ...push.Option) (*push.Controller,
|
||||
pusher := push.New(
|
||||
simple.NewWithExactDistribution(),
|
||||
exporter,
|
||||
append([]push.Option{push.WithStateful(true)}, options...)...,
|
||||
options...,
|
||||
)
|
||||
pusher.Start()
|
||||
|
||||
return pusher, nil
|
||||
}
|
||||
|
||||
func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind {
|
||||
return export.PassThroughExporter
|
||||
}
|
||||
|
||||
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
|
||||
var aggError error
|
||||
var batch expoBatch
|
||||
@ -157,7 +158,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
|
||||
ts := time.Now()
|
||||
batch.Timestamp = &ts
|
||||
}
|
||||
aggError = checkpointSet.ForEach(func(record export.Record) error {
|
||||
aggError = checkpointSet.ForEach(e, func(record export.Record) error {
|
||||
desc := record.Descriptor()
|
||||
agg := record.Aggregation()
|
||||
kind := desc.NumberKind()
|
||||
|
@ -108,7 +108,9 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
|
||||
return newAgg, true
|
||||
}
|
||||
|
||||
func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
|
||||
// ForEach does not use ExportKindSelected: use a real Integrator to
|
||||
// test ExportKind functionality.
|
||||
func (p *CheckpointSet) ForEach(_ export.ExportKindSelector, f func(export.Record) error) error {
|
||||
for _, r := range p.updates {
|
||||
if err := f(r); err != nil && !errors.Is(err, aggregation.ErrNoData) {
|
||||
return err
|
||||
|
@ -62,8 +62,8 @@ type result struct {
|
||||
|
||||
// CheckpointSet transforms all records contained in a checkpoint into
|
||||
// batched OTLP ResourceMetrics.
|
||||
func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
|
||||
records, errc := source(ctx, cps)
|
||||
func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) {
|
||||
records, errc := source(ctx, exportSelector, cps)
|
||||
|
||||
// Start a fixed number of goroutines to transform records.
|
||||
transformed := make(chan result)
|
||||
@ -96,14 +96,14 @@ func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uin
|
||||
// source starts a goroutine that sends each one of the Records yielded by
|
||||
// the CheckpointSet on the returned chan. Any error encoutered will be sent
|
||||
// on the returned error chan after seeding is complete.
|
||||
func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record, <-chan error) {
|
||||
func source(ctx context.Context, exportSelector export.ExportKindSelector, cps export.CheckpointSet) (<-chan export.Record, <-chan error) {
|
||||
errc := make(chan error, 1)
|
||||
out := make(chan export.Record)
|
||||
// Seed records into process.
|
||||
go func() {
|
||||
defer close(out)
|
||||
// No select is needed since errc is buffered.
|
||||
errc <- cps.ForEach(func(r export.Record) error {
|
||||
errc <- cps.ForEach(exportSelector, func(r export.Record) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ErrContextCanceled
|
||||
|
@ -28,8 +28,10 @@ import (
|
||||
colmetricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1"
|
||||
coltracepb "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/trace/v1"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
|
||||
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
)
|
||||
|
||||
@ -238,7 +240,7 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
|
||||
}
|
||||
}(ctx, cancel)
|
||||
|
||||
rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers)
|
||||
rms, err := transform.CheckpointSet(ctx, e, cps, e.c.numWorkers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -265,6 +267,10 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Exporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) metricsdk.ExportKind {
|
||||
return metricsdk.PassThroughExporter
|
||||
}
|
||||
|
||||
func (e *Exporter) ExportSpan(ctx context.Context, sd *tracesdk.SpanData) {
|
||||
e.uploadTraces(ctx, []*tracesdk.SpanData{sd})
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
metricapi "go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
exporttrace "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
|
||||
@ -116,7 +117,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption)
|
||||
}
|
||||
|
||||
selector := simple.NewWithExactDistribution()
|
||||
integrator := integrator.New(selector, true)
|
||||
integrator := integrator.New(selector, metricsdk.PassThroughExporter)
|
||||
pusher := push.New(integrator, exp)
|
||||
pusher.Start()
|
||||
|
||||
|
@ -82,7 +82,7 @@ type checkpointSet struct {
|
||||
records []metricsdk.Record
|
||||
}
|
||||
|
||||
func (m *checkpointSet) ForEach(fn func(metricsdk.Record) error) error {
|
||||
func (m *checkpointSet) ForEach(_ metricsdk.ExportKindSelector, fn func(metricsdk.Record) error) error {
|
||||
for _, r := range m.records {
|
||||
if err := fn(r); err != nil && err != aggregation.ErrNoData {
|
||||
return err
|
||||
|
@ -151,6 +151,7 @@ var (
|
||||
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
|
||||
ErrNaNInput = fmt.Errorf("NaN value is an invalid input")
|
||||
ErrInconsistentType = fmt.Errorf("inconsistent aggregator types")
|
||||
ErrNoSubtraction = fmt.Errorf("aggregator does not subtract")
|
||||
|
||||
// ErrNoData is returned when (due to a race with collection)
|
||||
// the Aggregator is check-pointed before the first value is set.
|
||||
|
35
sdk/export/metric/exportkind_string.go
Normal file
35
sdk/export/metric/exportkind_string.go
Normal file
@ -0,0 +1,35 @@
|
||||
// Code generated by "stringer -type=ExportKind"; DO NOT EDIT.
|
||||
|
||||
package metric
|
||||
|
||||
import "strconv"
|
||||
|
||||
func _() {
|
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{}
|
||||
_ = x[CumulativeExporter-1]
|
||||
_ = x[DeltaExporter-2]
|
||||
_ = x[PassThroughExporter-4]
|
||||
}
|
||||
|
||||
const (
|
||||
_ExportKind_name_0 = "CumulativeExporterDeltaExporter"
|
||||
_ExportKind_name_1 = "PassThroughExporter"
|
||||
)
|
||||
|
||||
var (
|
||||
_ExportKind_index_0 = [...]uint8{0, 18, 31}
|
||||
)
|
||||
|
||||
func (i ExportKind) String() string {
|
||||
switch {
|
||||
case 1 <= i && i <= 2:
|
||||
i -= 1
|
||||
return _ExportKind_name_0[_ExportKind_index_0[i]:_ExportKind_index_0[i+1]]
|
||||
case i == 4:
|
||||
return _ExportKind_name_1
|
||||
default:
|
||||
return "ExportKind(" + strconv.FormatInt(int64(i), 10) + ")"
|
||||
}
|
||||
}
|
64
sdk/export/metric/exportkind_test.go
Normal file
64
sdk/export/metric/exportkind_test.go
Normal file
@ -0,0 +1,64 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
)
|
||||
|
||||
func TestExportKindIdentity(t *testing.T) {
|
||||
akind := aggregation.Kind(0)
|
||||
|
||||
require.Equal(t, CumulativeExporter, CumulativeExporter.ExportKindFor(nil, akind))
|
||||
require.Equal(t, DeltaExporter, DeltaExporter.ExportKindFor(nil, akind))
|
||||
require.Equal(t, PassThroughExporter, PassThroughExporter.ExportKindFor(nil, akind))
|
||||
}
|
||||
|
||||
func TestExportKindIncludes(t *testing.T) {
|
||||
require.True(t, CumulativeExporter.Includes(CumulativeExporter))
|
||||
require.True(t, DeltaExporter.Includes(CumulativeExporter|DeltaExporter))
|
||||
require.False(t, DeltaExporter.Includes(PassThroughExporter|CumulativeExporter))
|
||||
}
|
||||
|
||||
var deltaMemoryKinds = []metric.Kind{
|
||||
metric.SumObserverKind,
|
||||
metric.UpDownSumObserverKind,
|
||||
}
|
||||
|
||||
var cumulativeMemoryKinds = []metric.Kind{
|
||||
metric.ValueRecorderKind,
|
||||
metric.ValueObserverKind,
|
||||
metric.CounterKind,
|
||||
metric.UpDownCounterKind,
|
||||
}
|
||||
|
||||
func TestExportKindMemoryRequired(t *testing.T) {
|
||||
for _, kind := range deltaMemoryKinds {
|
||||
require.True(t, DeltaExporter.MemoryRequired(kind))
|
||||
require.False(t, CumulativeExporter.MemoryRequired(kind))
|
||||
require.False(t, PassThroughExporter.MemoryRequired(kind))
|
||||
}
|
||||
|
||||
for _, kind := range cumulativeMemoryKinds {
|
||||
require.True(t, CumulativeExporter.MemoryRequired(kind))
|
||||
require.False(t, DeltaExporter.MemoryRequired(kind))
|
||||
require.False(t, PassThroughExporter.MemoryRequired(kind))
|
||||
}
|
||||
}
|
@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:generate stringer -type=ExportKind
|
||||
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
|
||||
import (
|
||||
@ -154,6 +156,16 @@ type Aggregator interface {
|
||||
Merge(Aggregator, *metric.Descriptor) error
|
||||
}
|
||||
|
||||
// Subtractor is an optional interface implemented by some
|
||||
// Aggregators. An Aggregator must support `Subtract()` in order to
|
||||
// be configured for a Precomputed-Sum instrument (SumObserver,
|
||||
// UpDownSumObserver) using a DeltaExporter.
|
||||
type Subtractor interface {
|
||||
// Subtract subtracts the `operand` from this Aggregator and
|
||||
// outputs the value in `result`.
|
||||
Subtract(operand, result Aggregator, descriptor *metric.Descriptor) error
|
||||
}
|
||||
|
||||
// Exporter handles presentation of the checkpoint of aggregate
|
||||
// metrics. This is the final stage of a metrics export pipeline,
|
||||
// where metric data are formatted for a specific system.
|
||||
@ -167,6 +179,21 @@ type Exporter interface {
|
||||
// The CheckpointSet interface refers to the Integrator that just
|
||||
// completed collection.
|
||||
Export(context.Context, CheckpointSet) error
|
||||
|
||||
// ExportKindSelector is an interface used by the Integrator
|
||||
// in deciding whether to compute Delta or Cumulative
|
||||
// Aggregations when passing Records to this Exporter.
|
||||
ExportKindSelector
|
||||
}
|
||||
|
||||
// ExportKindSelector is a sub-interface of Exporter used to indicate
|
||||
// whether the Integrator should compute Delta or Cumulative
|
||||
// Aggregations.
|
||||
type ExportKindSelector interface {
|
||||
// ExportKindFor should return the correct ExportKind that
|
||||
// should be used when exporting data for the given metric
|
||||
// instrument and Aggregator kind.
|
||||
ExportKindFor(*metric.Descriptor, aggregation.Kind) ExportKind
|
||||
}
|
||||
|
||||
// CheckpointSet allows a controller to access a complete checkpoint of
|
||||
@ -178,11 +205,16 @@ type CheckpointSet interface {
|
||||
// metrics that were updated during the last collection
|
||||
// period. Each aggregated checkpoint returned by the
|
||||
// function parameter may return an error.
|
||||
//
|
||||
// The ExportKindSelector argument is used to determine
|
||||
// whether the Record is computed using Delta or Cumulative
|
||||
// aggregation.
|
||||
//
|
||||
// ForEach tolerates ErrNoData silently, as this is
|
||||
// expected from the Meter implementation. Any other kind
|
||||
// of error will immediately halt ForEach and return
|
||||
// the error to the caller.
|
||||
ForEach(func(Record) error) error
|
||||
ForEach(ExportKindSelector, func(Record) error) error
|
||||
|
||||
// Locker supports locking the checkpoint set. Collection
|
||||
// into the checkpoint set cannot take place (in case of a
|
||||
@ -292,3 +324,52 @@ func (r Record) StartTime() time.Time {
|
||||
func (r Record) EndTime() time.Time {
|
||||
return r.end
|
||||
}
|
||||
|
||||
// ExportKind indicates the kind of data exported by an exporter.
|
||||
// These bits may be OR-d together when multiple exporters are in use.
|
||||
type ExportKind int
|
||||
|
||||
const (
|
||||
// CumulativeExporter indicates that the Exporter expects a
|
||||
// Cumulative Aggregation.
|
||||
CumulativeExporter ExportKind = 1 // e.g., Prometheus
|
||||
|
||||
// DeltaExporter indicates that the Exporter expects a
|
||||
// Delta Aggregation.
|
||||
DeltaExporter ExportKind = 2 // e.g., StatsD
|
||||
|
||||
// PassThroughExporter indicates that the Exporter expects
|
||||
// either a Cumulative or a Delta Aggregation, whichever does
|
||||
// not require maintaining state for the given instrument.
|
||||
PassThroughExporter ExportKind = 4 // e.g., OTLP
|
||||
)
|
||||
|
||||
// Includes tests whether `kind` includes a specific kind of
|
||||
// exporter.
|
||||
func (kind ExportKind) Includes(has ExportKind) bool {
|
||||
return kind&has != 0
|
||||
}
|
||||
|
||||
// ExportKindFor returns a constant, as an implementation of ExportKindSelector.
|
||||
func (kind ExportKind) ExportKindFor(_ *metric.Descriptor, _ aggregation.Kind) ExportKind {
|
||||
return kind
|
||||
}
|
||||
|
||||
// MemoryRequired returns whether an exporter of this kind requires
|
||||
// memory to export correctly.
|
||||
func (kind ExportKind) MemoryRequired(mkind metric.Kind) bool {
|
||||
switch mkind {
|
||||
case metric.ValueRecorderKind, metric.ValueObserverKind,
|
||||
metric.CounterKind, metric.UpDownCounterKind:
|
||||
// Delta-oriented instruments:
|
||||
return kind.Includes(CumulativeExporter)
|
||||
|
||||
case metric.SumObserverKind, metric.UpDownSumObserverKind:
|
||||
// Cumulative-oriented instruments:
|
||||
return kind.Includes(DeltaExporter)
|
||||
}
|
||||
// Something unexpected is happening--we could panic. This
|
||||
// will become an error when the exporter tries to access a
|
||||
// checkpoint, presumably, so let it be.
|
||||
return false
|
||||
}
|
||||
|
@ -43,6 +43,9 @@ var _ aggregation.Distribution = &Aggregator{}
|
||||
|
||||
// New returns a new DDSketch aggregator.
|
||||
func New(cnt int, desc *metric.Descriptor, cfg *Config) []Aggregator {
|
||||
if cfg == nil {
|
||||
cfg = NewDefaultConfig()
|
||||
}
|
||||
aggs := make([]Aggregator, cnt)
|
||||
for i := range aggs {
|
||||
aggs[i] = Aggregator{
|
||||
|
@ -31,6 +31,7 @@ type Aggregator struct {
|
||||
}
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ export.Subtractor = &Aggregator{}
|
||||
var _ aggregation.Sum = &Aggregator{}
|
||||
|
||||
// New returns a new counter aggregator implemented by atomic
|
||||
@ -82,3 +83,19 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error
|
||||
c.value.AddNumber(desc.NumberKind(), o.value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Aggregator) Subtract(opAgg, resAgg export.Aggregator, descriptor *metric.Descriptor) error {
|
||||
op, _ := opAgg.(*Aggregator)
|
||||
if op == nil {
|
||||
return aggregator.NewInconsistentAggregatorError(c, opAgg)
|
||||
}
|
||||
|
||||
res, _ := resAgg.(*Aggregator)
|
||||
if res == nil {
|
||||
return aggregator.NewInconsistentAggregatorError(c, resAgg)
|
||||
}
|
||||
|
||||
res.value = c.value
|
||||
res.value.AddNumber(descriptor.NumberKind(), metric.NewNumberSignChange(descriptor.NumberKind(), op.value))
|
||||
return nil
|
||||
}
|
||||
|
@ -27,11 +27,6 @@ type Config struct {
|
||||
// created by the Controller.
|
||||
Resource *resource.Resource
|
||||
|
||||
// Stateful causes the controller to maintain state across
|
||||
// collection events, so that records in the exported
|
||||
// checkpoint set are cumulative.
|
||||
Stateful bool
|
||||
|
||||
// CachePeriod is the period which a recently-computed result
|
||||
// will be returned without gathering metric data again.
|
||||
//
|
||||
@ -57,17 +52,6 @@ func (o resourceOption) Apply(config *Config) {
|
||||
config.Resource = o.Resource
|
||||
}
|
||||
|
||||
// WithStateful sets the Stateful configuration option of a Config.
|
||||
func WithStateful(stateful bool) Option {
|
||||
return statefulOption(stateful)
|
||||
}
|
||||
|
||||
type statefulOption bool
|
||||
|
||||
func (o statefulOption) Apply(config *Config) {
|
||||
config.Stateful = bool(o)
|
||||
}
|
||||
|
||||
// WithCachePeriod sets the CachePeriod configuration option of a Config.
|
||||
func WithCachePeriod(cachePeriod time.Duration) Option {
|
||||
return cachePeriodOption(cachePeriod)
|
||||
|
@ -45,7 +45,7 @@ type Controller struct {
|
||||
}
|
||||
|
||||
// New returns a *Controller configured with an aggregation selector and options.
|
||||
func New(selector export.AggregationSelector, options ...Option) *Controller {
|
||||
func New(aselector export.AggregationSelector, eselector export.ExportKindSelector, options ...Option) *Controller {
|
||||
config := &Config{
|
||||
Resource: resource.Empty(),
|
||||
CachePeriod: DefaultCachePeriod,
|
||||
@ -53,7 +53,7 @@ func New(selector export.AggregationSelector, options ...Option) *Controller {
|
||||
for _, opt := range options {
|
||||
opt.Apply(config)
|
||||
}
|
||||
integrator := integrator.New(selector, config.Stateful)
|
||||
integrator := integrator.New(aselector, eselector)
|
||||
accum := sdk.NewAccumulator(
|
||||
integrator,
|
||||
sdk.WithResource(config.Resource),
|
||||
@ -83,11 +83,11 @@ func (c *Controller) Provider() metric.Provider {
|
||||
|
||||
// Foreach gives the caller read-locked access to the current
|
||||
// export.CheckpointSet.
|
||||
func (c *Controller) ForEach(f func(export.Record) error) error {
|
||||
func (c *Controller) ForEach(ks export.ExportKindSelector, f func(export.Record) error) error {
|
||||
c.integrator.RLock()
|
||||
defer c.integrator.RUnlock()
|
||||
|
||||
return c.checkpoint.ForEach(f)
|
||||
return c.checkpoint.ForEach(ks, f)
|
||||
}
|
||||
|
||||
// Collect requests a collection. The collection will be skipped if
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"go.opentelemetry.io/otel/api/kv"
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/pull"
|
||||
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
|
||||
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
|
||||
@ -34,8 +35,8 @@ import (
|
||||
func TestPullNoCache(t *testing.T) {
|
||||
puller := pull.New(
|
||||
selector.NewWithExactDistribution(),
|
||||
export.CumulativeExporter,
|
||||
pull.WithCachePeriod(0),
|
||||
pull.WithStateful(true),
|
||||
)
|
||||
|
||||
ctx := context.Background()
|
||||
@ -46,7 +47,7 @@ func TestPullNoCache(t *testing.T) {
|
||||
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records := test.NewOutput(label.DefaultEncoder())
|
||||
require.NoError(t, puller.ForEach(records.AddRecord))
|
||||
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter/A=B/": 10,
|
||||
@ -56,7 +57,7 @@ func TestPullNoCache(t *testing.T) {
|
||||
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records = test.NewOutput(label.DefaultEncoder())
|
||||
require.NoError(t, puller.ForEach(records.AddRecord))
|
||||
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter/A=B/": 20,
|
||||
@ -66,8 +67,8 @@ func TestPullNoCache(t *testing.T) {
|
||||
func TestPullWithCache(t *testing.T) {
|
||||
puller := pull.New(
|
||||
selector.NewWithExactDistribution(),
|
||||
export.CumulativeExporter,
|
||||
pull.WithCachePeriod(time.Second),
|
||||
pull.WithStateful(true),
|
||||
)
|
||||
mock := controllerTest.NewMockClock()
|
||||
puller.SetClock(mock)
|
||||
@ -80,7 +81,7 @@ func TestPullWithCache(t *testing.T) {
|
||||
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records := test.NewOutput(label.DefaultEncoder())
|
||||
require.NoError(t, puller.ForEach(records.AddRecord))
|
||||
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter/A=B/": 10,
|
||||
@ -91,7 +92,7 @@ func TestPullWithCache(t *testing.T) {
|
||||
// Cached value!
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records = test.NewOutput(label.DefaultEncoder())
|
||||
require.NoError(t, puller.ForEach(records.AddRecord))
|
||||
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter/A=B/": 10,
|
||||
@ -103,7 +104,7 @@ func TestPullWithCache(t *testing.T) {
|
||||
// Re-computed value!
|
||||
require.NoError(t, puller.Collect(ctx))
|
||||
records = test.NewOutput(label.DefaultEncoder())
|
||||
require.NoError(t, puller.ForEach(records.AddRecord))
|
||||
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"counter/A=B/": 20,
|
||||
|
@ -26,11 +26,6 @@ type Config struct {
|
||||
// created by the Controller.
|
||||
Resource *resource.Resource
|
||||
|
||||
// Stateful causes the controller to maintain state across
|
||||
// collection events, so that records in the exported
|
||||
// checkpoint set are cumulative.
|
||||
Stateful bool
|
||||
|
||||
// Period is the interval between calls to Collect a checkpoint.
|
||||
Period time.Duration
|
||||
|
||||
@ -57,17 +52,6 @@ func (o resourceOption) Apply(config *Config) {
|
||||
config.Resource = o.Resource
|
||||
}
|
||||
|
||||
// WithStateful sets the Stateful configuration option of a Config.
|
||||
func WithStateful(stateful bool) Option {
|
||||
return statefulOption(stateful)
|
||||
}
|
||||
|
||||
type statefulOption bool
|
||||
|
||||
func (o statefulOption) Apply(config *Config) {
|
||||
config.Stateful = bool(o)
|
||||
}
|
||||
|
||||
// WithPeriod sets the Period configuration option of a Config.
|
||||
func WithPeriod(period time.Duration) Option {
|
||||
return periodOption(period)
|
||||
|
@ -60,7 +60,7 @@ func New(selector export.AggregationSelector, exporter export.Exporter, opts ...
|
||||
c.Timeout = c.Period
|
||||
}
|
||||
|
||||
integrator := simple.New(selector, c.Stateful)
|
||||
integrator := simple.New(selector, exporter)
|
||||
impl := sdk.NewAccumulator(
|
||||
integrator,
|
||||
sdk.WithResource(c.Resource),
|
||||
|
@ -91,12 +91,16 @@ func newFixture(t *testing.T) testFixture {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *testExporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind {
|
||||
return export.PassThroughExporter
|
||||
}
|
||||
|
||||
func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
e.exports++
|
||||
var records []export.Record
|
||||
if err := checkpointSet.ForEach(func(r export.Record) error {
|
||||
if err := checkpointSet.ForEach(e, func(r export.Record) error {
|
||||
if e.injectErr != nil {
|
||||
if err := e.injectErr(r); err != nil {
|
||||
return err
|
||||
|
@ -29,31 +29,47 @@ import (
|
||||
|
||||
type (
|
||||
Integrator struct {
|
||||
export.ExportKindSelector
|
||||
export.AggregationSelector
|
||||
stateful bool
|
||||
batch
|
||||
|
||||
state
|
||||
}
|
||||
|
||||
batchKey struct {
|
||||
stateKey struct {
|
||||
descriptor *metric.Descriptor
|
||||
distinct label.Distinct
|
||||
resource label.Distinct
|
||||
}
|
||||
|
||||
batchValue struct {
|
||||
aggregator export.Aggregator
|
||||
labels *label.Set
|
||||
resource *resource.Resource
|
||||
stateValue struct {
|
||||
// labels corresponds to the stateKey.distinct field.
|
||||
labels *label.Set
|
||||
|
||||
// resource corresponds to the stateKey.resource field.
|
||||
resource *resource.Resource
|
||||
|
||||
// updated indicates the last sequence number when this value had
|
||||
// Process() called by an accumulator.
|
||||
updated int64
|
||||
|
||||
// stateful indicates that a cumulative aggregation is
|
||||
// being maintained, taken from the process start time.
|
||||
stateful bool
|
||||
|
||||
current export.Aggregator // refers to single-accumulator checkpoint or delta.
|
||||
delta export.Aggregator // owned if multi accumulator else nil.
|
||||
cumulative export.Aggregator // owned if stateful else nil.
|
||||
}
|
||||
|
||||
batch struct {
|
||||
state struct {
|
||||
// RWMutex implements locking for the `CheckpointSet` interface.
|
||||
sync.RWMutex
|
||||
values map[batchKey]batchValue
|
||||
values map[stateKey]*stateValue
|
||||
|
||||
// Note: the timestamp logic currently assumes all
|
||||
// exports are deltas.
|
||||
|
||||
processStart time.Time
|
||||
intervalStart time.Time
|
||||
intervalEnd time.Time
|
||||
|
||||
@ -68,97 +84,271 @@ type (
|
||||
)
|
||||
|
||||
var _ export.Integrator = &Integrator{}
|
||||
var _ export.CheckpointSet = &batch{}
|
||||
var _ export.CheckpointSet = &state{}
|
||||
var ErrInconsistentState = fmt.Errorf("inconsistent integrator state")
|
||||
var ErrInvalidExporterKind = fmt.Errorf("invalid exporter kind")
|
||||
|
||||
func New(selector export.AggregationSelector, stateful bool) *Integrator {
|
||||
// New returns a basic Integrator using the provided
|
||||
// AggregationSelector to select Aggregators. The ExportKindSelector
|
||||
// is consulted to determine the kind(s) of exporter that will consume
|
||||
// data, so that this Integrator can prepare to compute Delta or
|
||||
// Cumulative Aggregations as needed.
|
||||
func New(aselector export.AggregationSelector, eselector export.ExportKindSelector) *Integrator {
|
||||
now := time.Now()
|
||||
return &Integrator{
|
||||
AggregationSelector: selector,
|
||||
stateful: stateful,
|
||||
batch: batch{
|
||||
values: map[batchKey]batchValue{},
|
||||
intervalStart: time.Now(),
|
||||
AggregationSelector: aselector,
|
||||
ExportKindSelector: eselector,
|
||||
state: state{
|
||||
values: map[stateKey]*stateValue{},
|
||||
processStart: now,
|
||||
intervalStart: now,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Integrator) Process(accumulation export.Accumulation) error {
|
||||
// Process implements export.Integrator.
|
||||
func (b *Integrator) Process(accum export.Accumulation) error {
|
||||
if b.startedCollection != b.finishedCollection+1 {
|
||||
return ErrInconsistentState
|
||||
}
|
||||
|
||||
desc := accumulation.Descriptor()
|
||||
key := batchKey{
|
||||
desc := accum.Descriptor()
|
||||
key := stateKey{
|
||||
descriptor: desc,
|
||||
distinct: accumulation.Labels().Equivalent(),
|
||||
resource: accumulation.Resource().Equivalent(),
|
||||
distinct: accum.Labels().Equivalent(),
|
||||
resource: accum.Resource().Equivalent(),
|
||||
}
|
||||
agg := accumulation.Aggregator()
|
||||
value, ok := b.batch.values[key]
|
||||
if ok {
|
||||
// Note: The call to Merge here combines only
|
||||
// identical accumulations. It is required even for a
|
||||
// stateless Integrator because such identical accumulations
|
||||
// may arise in the Meter implementation due to race
|
||||
// conditions.
|
||||
return value.aggregator.Merge(agg, desc)
|
||||
agg := accum.Aggregator()
|
||||
|
||||
// Check if there is an existing value.
|
||||
value, ok := b.state.values[key]
|
||||
if !ok {
|
||||
stateful := b.ExportKindFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.MetricKind())
|
||||
|
||||
newValue := &stateValue{
|
||||
labels: accum.Labels(),
|
||||
resource: accum.Resource(),
|
||||
updated: b.state.finishedCollection,
|
||||
stateful: stateful,
|
||||
current: agg,
|
||||
}
|
||||
if stateful {
|
||||
if desc.MetricKind().PrecomputedSum() {
|
||||
// If we know we need to compute deltas, allocate two aggregators.
|
||||
b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
|
||||
} else {
|
||||
// In this case we are not certain to need a delta, only allocate a
|
||||
// cumulative aggregator. We _may_ need a delta accumulator if
|
||||
// multiple synchronous Accumulators produce an Accumulation (handled
|
||||
// below), which requires merging them into a temporary Aggregator.
|
||||
b.AggregatorFor(desc, &newValue.cumulative)
|
||||
}
|
||||
}
|
||||
b.state.values[key] = newValue
|
||||
return nil
|
||||
}
|
||||
// If this integrator is stateful, create a copy of the
|
||||
// Aggregator for long-term storage. Otherwise the
|
||||
// Meter implementation will checkpoint the aggregator
|
||||
// again, overwriting the long-lived state.
|
||||
if b.stateful {
|
||||
tmp := agg
|
||||
// Note: the call to AggregatorFor() followed by Merge
|
||||
// is effectively a Clone() operation.
|
||||
b.AggregatorFor(desc, &agg)
|
||||
if err := agg.Merge(tmp, desc); err != nil {
|
||||
|
||||
// Advance the update sequence number.
|
||||
sameCollection := b.state.finishedCollection == value.updated
|
||||
value.updated = b.state.finishedCollection
|
||||
|
||||
// An existing value will be found for some stateKey when:
|
||||
// (a) stateful aggregation is being used
|
||||
// (b) multiple accumulators are being used.
|
||||
//
|
||||
// Case (a) occurs when the instrument and the exporter
|
||||
// require memory to work correctly, either because the
|
||||
// instrument reports a PrecomputedSum to a DeltaExporter or
|
||||
// the reverse, a non-PrecomputedSum instrument with a
|
||||
// CumulativeExporter. This logic is encapsulated in
|
||||
// ExportKind.MemoryRequired(MetricKind).
|
||||
//
|
||||
// Case (b) occurs when the variable `sameCollection` is true,
|
||||
// indicating that the stateKey for Accumulation has already
|
||||
// been seen in the same collection. When this happens, it
|
||||
// implies that multiple Accumulators are being used because
|
||||
// the Accumulator outputs a maximum of one Accumulation per
|
||||
// instrument and label set.
|
||||
//
|
||||
// The following logic distinguishes between asynchronous and
|
||||
// synchronous instruments in order to ensure that the use of
|
||||
// multiple Accumulators does not change instrument semantics.
|
||||
// To maintain the instrument semantics, multiple synchronous
|
||||
// Accumulations should be merged, whereas when multiple
|
||||
// asynchronous Accumulations are processed, the last value
|
||||
// should be kept.
|
||||
|
||||
if !sameCollection {
|
||||
// This is the first Accumulation we've seen for this
|
||||
// stateKey during this collection. Just keep a
|
||||
// reference to the Accumulator's Aggregator.
|
||||
value.current = agg
|
||||
return nil
|
||||
}
|
||||
if desc.MetricKind().Asynchronous() {
|
||||
// The last value across multiple accumulators is taken.
|
||||
// Just keep a reference to the Accumulator's Aggregator.
|
||||
value.current = agg
|
||||
return nil
|
||||
}
|
||||
|
||||
// The above two cases are keeping a reference to the
|
||||
// Accumulator's Aggregator. The remaining cases address
|
||||
// synchronous instruments, which always merge multiple
|
||||
// Accumulations using `value.delta` for temporary storage.
|
||||
|
||||
if value.delta == nil {
|
||||
// The temporary `value.delta` may have been allocated
|
||||
// already, either in a prior pass through this block of
|
||||
// code or in the `!ok` branch above. It would be
|
||||
// allocated in the `!ok` branch if this is stateful
|
||||
// PrecomputedSum instrument (in which case the exporter
|
||||
// is requesting a delta so we allocate it up front),
|
||||
// and it would be allocated in this block when multiple
|
||||
// accumulators are used and the first condition is not
|
||||
// met.
|
||||
b.AggregationSelector.AggregatorFor(desc, &value.delta)
|
||||
}
|
||||
if value.current != value.delta {
|
||||
// If the current and delta Aggregators are not the same it
|
||||
// implies that multiple Accumulators were used. The first
|
||||
// Accumulation seen for a given stateKey will return in
|
||||
// one of the cases above after assigning `value.current
|
||||
// = agg` (i.e., after taking a reference to the
|
||||
// Accumulator's Aggregator).
|
||||
//
|
||||
// The second time through this branch copies the
|
||||
// Accumulator's Aggregator into `value.delta` and sets
|
||||
// `value.current` appropriately to avoid this branch if
|
||||
// a third Accumulator is used.
|
||||
err := value.current.SynchronizedCopy(value.delta, desc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
value.current = value.delta
|
||||
}
|
||||
b.batch.values[key] = batchValue{
|
||||
aggregator: agg,
|
||||
labels: accumulation.Labels(),
|
||||
resource: accumulation.Resource(),
|
||||
}
|
||||
return nil
|
||||
// The two statements above ensures that `value.current` refers
|
||||
// to `value.delta` and not to an Accumulator's Aggregator. Now
|
||||
// combine this Accumulation with the prior Accumulation.
|
||||
return value.delta.Merge(agg, desc)
|
||||
}
|
||||
|
||||
// CheckpointSet returns the associated CheckpointSet. Use the
|
||||
// CheckpointSet Locker interface to synchronize access to this
|
||||
// object. The CheckpointSet.ForEach() method cannot be called
|
||||
// concurrently with Process().
|
||||
func (b *Integrator) CheckpointSet() export.CheckpointSet {
|
||||
return &b.batch
|
||||
return &b.state
|
||||
}
|
||||
|
||||
// StartCollection signals to the Integrator one or more Accumulators
|
||||
// will begin calling Process() calls during collection.
|
||||
func (b *Integrator) StartCollection() {
|
||||
if b.startedCollection != 0 {
|
||||
b.intervalStart = b.intervalEnd
|
||||
}
|
||||
b.startedCollection++
|
||||
if !b.stateful {
|
||||
b.batch.values = map[batchKey]batchValue{}
|
||||
}
|
||||
}
|
||||
|
||||
// FinishCollection signals to the Integrator that a complete
|
||||
// collection has finished and that ForEach will be called to access
|
||||
// the CheckpointSet.
|
||||
func (b *Integrator) FinishCollection() error {
|
||||
b.finishedCollection++
|
||||
b.intervalEnd = time.Now()
|
||||
if b.startedCollection != b.finishedCollection {
|
||||
if b.startedCollection != b.finishedCollection+1 {
|
||||
return ErrInconsistentState
|
||||
}
|
||||
defer func() { b.finishedCollection++ }()
|
||||
|
||||
for key, value := range b.values {
|
||||
mkind := key.descriptor.MetricKind()
|
||||
|
||||
if !value.stateful {
|
||||
if value.updated != b.finishedCollection {
|
||||
delete(b.values, key)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Update Aggregator state to support exporting either a
|
||||
// delta or a cumulative aggregation.
|
||||
var err error
|
||||
if mkind.PrecomputedSum() {
|
||||
// delta_value = current_cumulative_value - previous_cumulative_value
|
||||
if subt, ok := value.current.(export.Subtractor); ok {
|
||||
err = subt.Subtract(value.cumulative, value.delta, key.descriptor)
|
||||
|
||||
if err == nil {
|
||||
err = value.current.SynchronizedCopy(value.cumulative, key.descriptor)
|
||||
}
|
||||
} else {
|
||||
err = aggregation.ErrNoSubtraction
|
||||
}
|
||||
} else {
|
||||
// cumulative_value = previous_cumulative_value + current_delta_value
|
||||
err = value.cumulative.Merge(value.current, key.descriptor)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *batch) ForEach(f func(export.Record) error) error {
|
||||
// ForEach iterates through the CheckpointSet, passing an
|
||||
// export.Record with the appropriate Cumulative or Delta aggregation
|
||||
// to an exporter.
|
||||
func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error {
|
||||
if b.startedCollection != b.finishedCollection {
|
||||
return ErrInconsistentState
|
||||
}
|
||||
|
||||
for key, value := range b.values {
|
||||
mkind := key.descriptor.MetricKind()
|
||||
|
||||
var agg aggregation.Aggregation
|
||||
var start time.Time
|
||||
|
||||
ekind := exporter.ExportKindFor(key.descriptor, value.current.Aggregation().Kind())
|
||||
switch ekind {
|
||||
case export.PassThroughExporter:
|
||||
// No state is required, pass through the checkpointed value.
|
||||
agg = value.current.Aggregation()
|
||||
|
||||
if mkind.PrecomputedSum() {
|
||||
start = b.processStart
|
||||
} else {
|
||||
start = b.intervalStart
|
||||
}
|
||||
|
||||
case export.CumulativeExporter:
|
||||
// If stateful, the sum has been computed. If stateless, the
|
||||
// input was already cumulative. Either way, use the checkpointed
|
||||
// value:
|
||||
if value.stateful {
|
||||
agg = value.cumulative.Aggregation()
|
||||
} else {
|
||||
agg = value.current.Aggregation()
|
||||
}
|
||||
start = b.processStart
|
||||
|
||||
case export.DeltaExporter:
|
||||
// Precomputed sums are a special case.
|
||||
if mkind.PrecomputedSum() {
|
||||
agg = value.delta.Aggregation()
|
||||
} else {
|
||||
agg = value.current.Aggregation()
|
||||
}
|
||||
start = b.intervalStart
|
||||
|
||||
default:
|
||||
return fmt.Errorf("%v: %w", ekind, ErrInvalidExporterKind)
|
||||
}
|
||||
|
||||
if err := f(export.NewRecord(
|
||||
key.descriptor,
|
||||
value.labels,
|
||||
value.resource,
|
||||
value.aggregator.Aggregation(),
|
||||
b.intervalStart,
|
||||
agg,
|
||||
start,
|
||||
b.intervalEnd,
|
||||
)); err != nil && !errors.Is(err, aggregation.ErrNoData) {
|
||||
return err
|
||||
|
@ -16,6 +16,8 @@ package simple_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -24,248 +26,306 @@ import (
|
||||
"go.opentelemetry.io/otel/api/kv"
|
||||
"go.opentelemetry.io/otel/api/label"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
exportTest "go.opentelemetry.io/otel/exporters/metric/test"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
"go.opentelemetry.io/otel/sdk/metric/integrator/simple"
|
||||
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
// Note: This var block and the helpers below will disappear in a
|
||||
// future PR (see the draft in #799). The test has been completely
|
||||
// rewritten there, so this code will simply be dropped.
|
||||
// TestIntegrator tests all the non-error paths in this package.
|
||||
func TestIntegrator(t *testing.T) {
|
||||
type exportCase struct {
|
||||
kind export.ExportKind
|
||||
}
|
||||
type instrumentCase struct {
|
||||
kind metric.Kind
|
||||
}
|
||||
type numberCase struct {
|
||||
kind metric.NumberKind
|
||||
}
|
||||
type aggregatorCase struct {
|
||||
kind aggregation.Kind
|
||||
}
|
||||
|
||||
var (
|
||||
// Resource is applied to all test records built in this package.
|
||||
Resource = resource.New(kv.String("R", "V"))
|
||||
|
||||
// LastValueADesc and LastValueBDesc group by "G"
|
||||
LastValueADesc = metric.NewDescriptor(
|
||||
"a.lastvalue", metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
LastValueBDesc = metric.NewDescriptor(
|
||||
"b.lastvalue", metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
// CounterADesc and CounterBDesc group by "C"
|
||||
CounterADesc = metric.NewDescriptor(
|
||||
"a.sum", metric.CounterKind, metric.Int64NumberKind)
|
||||
CounterBDesc = metric.NewDescriptor(
|
||||
"b.sum", metric.CounterKind, metric.Int64NumberKind)
|
||||
|
||||
// LastValue groups are (labels1), (labels2+labels3)
|
||||
// Counter groups are (labels1+labels2), (labels3)
|
||||
|
||||
// Labels1 has G=H and C=D
|
||||
Labels1 = makeLabels(kv.String("G", "H"), kv.String("C", "D"))
|
||||
// Labels2 has C=D and E=F
|
||||
Labels2 = makeLabels(kv.String("C", "D"), kv.String("E", "F"))
|
||||
// Labels3 is the empty set
|
||||
Labels3 = makeLabels()
|
||||
)
|
||||
|
||||
func makeLabels(labels ...kv.KeyValue) *label.Set {
|
||||
s := label.NewSet(labels...)
|
||||
return &s
|
||||
for _, tc := range []exportCase{
|
||||
{kind: export.PassThroughExporter},
|
||||
{kind: export.CumulativeExporter},
|
||||
{kind: export.DeltaExporter},
|
||||
} {
|
||||
t.Run(tc.kind.String(), func(t *testing.T) {
|
||||
for _, ic := range []instrumentCase{
|
||||
{kind: metric.CounterKind},
|
||||
{kind: metric.UpDownCounterKind},
|
||||
{kind: metric.ValueRecorderKind},
|
||||
{kind: metric.SumObserverKind},
|
||||
{kind: metric.UpDownSumObserverKind},
|
||||
{kind: metric.ValueObserverKind},
|
||||
} {
|
||||
t.Run(ic.kind.String(), func(t *testing.T) {
|
||||
for _, nc := range []numberCase{
|
||||
{kind: metric.Int64NumberKind},
|
||||
{kind: metric.Float64NumberKind},
|
||||
} {
|
||||
t.Run(nc.kind.String(), func(t *testing.T) {
|
||||
for _, ac := range []aggregatorCase{
|
||||
{kind: aggregation.SumKind},
|
||||
{kind: aggregation.MinMaxSumCountKind},
|
||||
{kind: aggregation.HistogramKind},
|
||||
{kind: aggregation.LastValueKind},
|
||||
{kind: aggregation.ExactKind},
|
||||
{kind: aggregation.SketchKind},
|
||||
} {
|
||||
t.Run(ac.kind.String(), func(t *testing.T) {
|
||||
testSynchronousIntegration(
|
||||
t,
|
||||
tc.kind,
|
||||
ic.kind,
|
||||
nc.kind,
|
||||
ac.kind,
|
||||
)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// LastValueAgg returns a checkpointed lastValue aggregator w/ the specified descriptor and value.
|
||||
func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator {
|
||||
type testSelector struct {
|
||||
kind aggregation.Kind
|
||||
}
|
||||
|
||||
func (ts testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
|
||||
for i := range aggPtrs {
|
||||
switch ts.kind {
|
||||
case aggregation.SumKind:
|
||||
*aggPtrs[i] = &sum.New(1)[0]
|
||||
case aggregation.MinMaxSumCountKind:
|
||||
*aggPtrs[i] = &minmaxsumcount.New(1, desc)[0]
|
||||
case aggregation.HistogramKind:
|
||||
*aggPtrs[i] = &histogram.New(1, desc, nil)[0]
|
||||
case aggregation.LastValueKind:
|
||||
*aggPtrs[i] = &lastvalue.New(1)[0]
|
||||
case aggregation.SketchKind:
|
||||
*aggPtrs[i] = &ddsketch.New(1, desc, nil)[0]
|
||||
case aggregation.ExactKind:
|
||||
*aggPtrs[i] = &array.New(1)[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testSynchronousIntegration(
|
||||
t *testing.T,
|
||||
ekind export.ExportKind,
|
||||
mkind metric.Kind,
|
||||
nkind metric.NumberKind,
|
||||
akind aggregation.Kind,
|
||||
) {
|
||||
ctx := context.Background()
|
||||
gagg := &lastvalue.New(1)[0]
|
||||
_ = gagg.Update(ctx, metric.NewInt64Number(v), desc)
|
||||
return gagg
|
||||
selector := testSelector{akind}
|
||||
res := resource.New(kv.String("R", "V"))
|
||||
|
||||
asNumber := func(value int64) metric.Number {
|
||||
if nkind == metric.Int64NumberKind {
|
||||
return metric.NewInt64Number(value)
|
||||
}
|
||||
return metric.NewFloat64Number(float64(value))
|
||||
}
|
||||
|
||||
updateFor := func(desc *metric.Descriptor, value int64, labs []kv.KeyValue) export.Accumulation {
|
||||
ls := label.NewSet(labs...)
|
||||
var agg export.Aggregator
|
||||
selector.AggregatorFor(desc, &agg)
|
||||
_ = agg.Update(ctx, asNumber(value), desc)
|
||||
|
||||
return export.NewAccumulation(desc, &ls, res, agg)
|
||||
}
|
||||
|
||||
labs1 := []kv.KeyValue{kv.String("L1", "V")}
|
||||
labs2 := []kv.KeyValue{kv.String("L2", "V")}
|
||||
|
||||
desc1 := metric.NewDescriptor("inst1", mkind, nkind)
|
||||
desc2 := metric.NewDescriptor("inst2", mkind, nkind)
|
||||
|
||||
// For 1 to 3 checkpoints:
|
||||
for NAccum := 1; NAccum <= 3; NAccum++ {
|
||||
t.Run(fmt.Sprintf("NumAccum=%d", NAccum), func(t *testing.T) {
|
||||
// For 1 to 3 accumulators:
|
||||
for NCheckpoint := 1; NCheckpoint <= 3; NCheckpoint++ {
|
||||
t.Run(fmt.Sprintf("NumCkpt=%d", NCheckpoint), func(t *testing.T) {
|
||||
|
||||
integrator := simple.New(selector, ekind)
|
||||
|
||||
for nc := 0; nc < NCheckpoint; nc++ {
|
||||
|
||||
// The input is 10 per update, scaled by
|
||||
// the number of checkpoints for
|
||||
// cumulative instruments:
|
||||
input := int64(10)
|
||||
cumulativeMultiplier := int64(nc + 1)
|
||||
if mkind.PrecomputedSum() {
|
||||
input *= cumulativeMultiplier
|
||||
}
|
||||
|
||||
integrator.StartCollection()
|
||||
|
||||
for na := 0; na < NAccum; na++ {
|
||||
_ = integrator.Process(updateFor(&desc1, input, labs1))
|
||||
_ = integrator.Process(updateFor(&desc2, input, labs2))
|
||||
}
|
||||
|
||||
err := integrator.FinishCollection()
|
||||
if err == aggregation.ErrNoSubtraction {
|
||||
var subr export.Aggregator
|
||||
selector.AggregatorFor(&desc1, &subr)
|
||||
_, canSub := subr.(export.Subtractor)
|
||||
|
||||
// Allow unsupported subraction case only when it is called for.
|
||||
require.True(t, mkind.PrecomputedSum() && ekind == export.DeltaExporter && !canSub)
|
||||
return
|
||||
} else if err != nil {
|
||||
t.Fatal(fmt.Sprint("unexpected FinishCollection error: ", err))
|
||||
}
|
||||
|
||||
if nc < NCheckpoint-1 {
|
||||
continue
|
||||
}
|
||||
|
||||
checkpointSet := integrator.CheckpointSet()
|
||||
|
||||
// Test the final checkpoint state.
|
||||
records1 := test.NewOutput(label.DefaultEncoder())
|
||||
err = checkpointSet.ForEach(ekind, records1.AddRecord)
|
||||
|
||||
// Test for an allowed error:
|
||||
if err != nil && err != aggregation.ErrNoSubtraction {
|
||||
t.Fatal(fmt.Sprint("unexpected checkpoint error: ", err))
|
||||
}
|
||||
var multiplier int64
|
||||
|
||||
if mkind.Asynchronous() {
|
||||
// Because async instruments take the last value,
|
||||
// the number of accumulators doesn't matter.
|
||||
if mkind.PrecomputedSum() {
|
||||
if ekind == export.DeltaExporter {
|
||||
multiplier = 1
|
||||
} else {
|
||||
multiplier = cumulativeMultiplier
|
||||
}
|
||||
} else {
|
||||
if ekind == export.CumulativeExporter && akind != aggregation.LastValueKind {
|
||||
multiplier = cumulativeMultiplier
|
||||
} else {
|
||||
multiplier = 1
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Synchronous accumulate results from multiple accumulators,
|
||||
// use that number as the baseline multiplier.
|
||||
multiplier = int64(NAccum)
|
||||
if ekind == export.CumulativeExporter {
|
||||
// If a cumulative exporter, include prior checkpoints.
|
||||
multiplier *= cumulativeMultiplier
|
||||
}
|
||||
if akind == aggregation.LastValueKind {
|
||||
// If a last-value aggregator, set multiplier to 1.0.
|
||||
multiplier = 1
|
||||
}
|
||||
}
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"inst1/L1=V/R=V": float64(multiplier * 10), // labels1
|
||||
"inst2/L2=V/R=V": float64(multiplier * 10), // labels2
|
||||
}, records1.Map)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Convenience method for building a test exported lastValue record.
|
||||
func NewLastValueAccumulation(desc *metric.Descriptor, labels *label.Set, value int64) export.Accumulation {
|
||||
return export.NewAccumulation(desc, labels, Resource, LastValueAgg(desc, value))
|
||||
type bogusExporter struct{}
|
||||
|
||||
func (bogusExporter) ExportKindFor(*metric.Descriptor, aggregation.Kind) export.ExportKind {
|
||||
return 1000000
|
||||
}
|
||||
|
||||
// Convenience method for building a test exported counter record.
|
||||
func NewCounterAccumulation(desc *metric.Descriptor, labels *label.Set, value int64) export.Accumulation {
|
||||
return export.NewAccumulation(desc, labels, Resource, CounterAgg(desc, value))
|
||||
}
|
||||
|
||||
// CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value.
|
||||
func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator {
|
||||
ctx := context.Background()
|
||||
cagg := &sum.New(1)[0]
|
||||
_ = cagg.Update(ctx, metric.NewInt64Number(v), desc)
|
||||
return cagg
|
||||
}
|
||||
|
||||
func TestSimpleStateless(t *testing.T) {
|
||||
b := simple.New(test.AggregationSelector(), false)
|
||||
|
||||
b.StartCollection()
|
||||
|
||||
// Set initial lastValue values
|
||||
_ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels1, 10))
|
||||
_ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels2, 20))
|
||||
_ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels3, 30))
|
||||
|
||||
_ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels1, 10))
|
||||
_ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels2, 20))
|
||||
_ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels3, 30))
|
||||
|
||||
// Another lastValue Set for Labels1
|
||||
_ = b.Process(NewLastValueAccumulation(&LastValueADesc, Labels1, 50))
|
||||
_ = b.Process(NewLastValueAccumulation(&LastValueBDesc, Labels1, 50))
|
||||
|
||||
// Set initial counter values
|
||||
_ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10))
|
||||
_ = b.Process(NewCounterAccumulation(&CounterADesc, Labels2, 20))
|
||||
_ = b.Process(NewCounterAccumulation(&CounterADesc, Labels3, 40))
|
||||
|
||||
_ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels1, 10))
|
||||
_ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels2, 20))
|
||||
_ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels3, 40))
|
||||
|
||||
// Another counter Add for Labels1
|
||||
_ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 50))
|
||||
_ = b.Process(NewCounterAccumulation(&CounterBDesc, Labels1, 50))
|
||||
|
||||
require.NoError(t, b.FinishCollection())
|
||||
|
||||
checkpointSet := b.CheckpointSet()
|
||||
|
||||
records := test.NewOutput(label.DefaultEncoder())
|
||||
_ = checkpointSet.ForEach(records.AddRecord)
|
||||
|
||||
// Output lastvalue should have only the "G=H" and "G=" keys.
|
||||
// Output counter should have only the "C=D" and "C=" keys.
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"a.sum/C=D,G=H/R=V": 60, // labels1
|
||||
"a.sum/C=D,E=F/R=V": 20, // labels2
|
||||
"a.sum//R=V": 40, // labels3
|
||||
"b.sum/C=D,G=H/R=V": 60, // labels1
|
||||
"b.sum/C=D,E=F/R=V": 20, // labels2
|
||||
"b.sum//R=V": 40, // labels3
|
||||
"a.lastvalue/C=D,G=H/R=V": 50, // labels1
|
||||
"a.lastvalue/C=D,E=F/R=V": 20, // labels2
|
||||
"a.lastvalue//R=V": 30, // labels3
|
||||
"b.lastvalue/C=D,G=H/R=V": 50, // labels1
|
||||
"b.lastvalue/C=D,E=F/R=V": 20, // labels2
|
||||
"b.lastvalue//R=V": 30, // labels3
|
||||
}, records.Map)
|
||||
|
||||
// Verify that state was reset
|
||||
b.StartCollection()
|
||||
require.NoError(t, b.FinishCollection())
|
||||
checkpointSet = b.CheckpointSet()
|
||||
_ = checkpointSet.ForEach(func(rec export.Record) error {
|
||||
t.Fatal("Unexpected call")
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestSimpleStateful(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
b := simple.New(test.AggregationSelector(), true)
|
||||
|
||||
b.StartCollection()
|
||||
|
||||
counterA := NewCounterAccumulation(&CounterADesc, Labels1, 10)
|
||||
_ = b.Process(counterA)
|
||||
|
||||
counterB := NewCounterAccumulation(&CounterBDesc, Labels1, 10)
|
||||
_ = b.Process(counterB)
|
||||
require.NoError(t, b.FinishCollection())
|
||||
|
||||
checkpointSet := b.CheckpointSet()
|
||||
|
||||
records1 := test.NewOutput(label.DefaultEncoder())
|
||||
_ = checkpointSet.ForEach(records1.AddRecord)
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"a.sum/C=D,G=H/R=V": 10, // labels1
|
||||
"b.sum/C=D,G=H/R=V": 10, // labels1
|
||||
}, records1.Map)
|
||||
|
||||
alloc := sum.New(4)
|
||||
caggA, caggB, ckptA, ckptB := &alloc[0], &alloc[1], &alloc[2], &alloc[3]
|
||||
|
||||
// Test that state was NOT reset
|
||||
checkpointSet = b.CheckpointSet()
|
||||
|
||||
b.StartCollection()
|
||||
require.NoError(t, b.FinishCollection())
|
||||
|
||||
records2 := test.NewOutput(label.DefaultEncoder())
|
||||
_ = checkpointSet.ForEach(records2.AddRecord)
|
||||
|
||||
require.EqualValues(t, records1.Map, records2.Map)
|
||||
|
||||
// Update and re-checkpoint the original record.
|
||||
_ = caggA.Update(ctx, metric.NewInt64Number(20), &CounterADesc)
|
||||
_ = caggB.Update(ctx, metric.NewInt64Number(20), &CounterBDesc)
|
||||
err := caggA.SynchronizedCopy(ckptA, &CounterADesc)
|
||||
require.NoError(t, err)
|
||||
err = caggB.SynchronizedCopy(ckptB, &CounterBDesc)
|
||||
require.NoError(t, err)
|
||||
|
||||
// As yet cagg has not been passed to Integrator.Process. Should
|
||||
// not see an update.
|
||||
checkpointSet = b.CheckpointSet()
|
||||
|
||||
records3 := test.NewOutput(label.DefaultEncoder())
|
||||
_ = checkpointSet.ForEach(records3.AddRecord)
|
||||
|
||||
require.EqualValues(t, records1.Map, records3.Map)
|
||||
b.StartCollection()
|
||||
|
||||
// Now process the second update
|
||||
_ = b.Process(export.NewAccumulation(&CounterADesc, Labels1, Resource, ckptA))
|
||||
_ = b.Process(export.NewAccumulation(&CounterBDesc, Labels1, Resource, ckptB))
|
||||
require.NoError(t, b.FinishCollection())
|
||||
|
||||
checkpointSet = b.CheckpointSet()
|
||||
|
||||
records4 := test.NewOutput(label.DefaultEncoder())
|
||||
_ = checkpointSet.ForEach(records4.AddRecord)
|
||||
|
||||
require.EqualValues(t, map[string]float64{
|
||||
"a.sum/C=D,G=H/R=V": 30,
|
||||
"b.sum/C=D,G=H/R=V": 30,
|
||||
}, records4.Map)
|
||||
func (bogusExporter) Export(context.Context, export.CheckpointSet) error {
|
||||
panic("Not called")
|
||||
}
|
||||
|
||||
func TestSimpleInconsistent(t *testing.T) {
|
||||
// Test double-start
|
||||
b := simple.New(test.AggregationSelector(), true)
|
||||
b := simple.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
|
||||
b.StartCollection()
|
||||
b.StartCollection()
|
||||
require.Equal(t, simple.ErrInconsistentState, b.FinishCollection())
|
||||
|
||||
// Test finish without start
|
||||
b = simple.New(test.AggregationSelector(), true)
|
||||
b = simple.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
|
||||
require.Equal(t, simple.ErrInconsistentState, b.FinishCollection())
|
||||
|
||||
// Test no finish
|
||||
b = simple.New(test.AggregationSelector(), true)
|
||||
b = simple.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
|
||||
b.StartCollection()
|
||||
require.Equal(t, simple.ErrInconsistentState, b.ForEach(func(export.Record) error { return nil }))
|
||||
require.Equal(
|
||||
t,
|
||||
simple.ErrInconsistentState,
|
||||
b.ForEach(
|
||||
export.PassThroughExporter,
|
||||
func(export.Record) error { return nil },
|
||||
),
|
||||
)
|
||||
|
||||
// Test no start
|
||||
b = simple.New(test.AggregationSelector(), true)
|
||||
b = simple.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
|
||||
desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
|
||||
accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), exportTest.NoopAggregator{})
|
||||
require.Equal(t, simple.ErrInconsistentState, b.Process(accum))
|
||||
|
||||
// Test invalid kind:
|
||||
b = simple.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
b.StartCollection()
|
||||
require.NoError(t, b.Process(accum))
|
||||
require.NoError(t, b.FinishCollection())
|
||||
|
||||
err := b.ForEach(
|
||||
bogusExporter{},
|
||||
func(export.Record) error { return nil },
|
||||
)
|
||||
require.True(t, errors.Is(err, simple.ErrInvalidExporterKind))
|
||||
|
||||
require.Equal(t, simple.ErrInconsistentState, b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10)))
|
||||
}
|
||||
|
||||
func TestSimpleTimestamps(t *testing.T) {
|
||||
beforeNew := time.Now()
|
||||
b := simple.New(test.AggregationSelector(), true)
|
||||
b := simple.New(test.AggregationSelector(), export.PassThroughExporter)
|
||||
afterNew := time.Now()
|
||||
|
||||
desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind)
|
||||
accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), exportTest.NoopAggregator{})
|
||||
|
||||
b.StartCollection()
|
||||
_ = b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10))
|
||||
_ = b.Process(accum)
|
||||
require.NoError(t, b.FinishCollection())
|
||||
|
||||
var start1, end1 time.Time
|
||||
|
||||
require.NoError(t, b.ForEach(func(rec export.Record) error {
|
||||
require.NoError(t, b.ForEach(export.PassThroughExporter, func(rec export.Record) error {
|
||||
start1 = rec.StartTime()
|
||||
end1 = rec.EndTime()
|
||||
return nil
|
||||
@ -277,12 +337,12 @@ func TestSimpleTimestamps(t *testing.T) {
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
b.StartCollection()
|
||||
require.NoError(t, b.Process(NewCounterAccumulation(&CounterADesc, Labels1, 10)))
|
||||
require.NoError(t, b.Process(accum))
|
||||
require.NoError(t, b.FinishCollection())
|
||||
|
||||
var start2, end2 time.Time
|
||||
|
||||
require.NoError(t, b.ForEach(func(rec export.Record) error {
|
||||
require.NoError(t, b.ForEach(export.PassThroughExporter, func(rec export.Record) error {
|
||||
start2 = rec.StartTime()
|
||||
end2 = rec.EndTime()
|
||||
return nil
|
||||
|
Reference in New Issue
Block a user