1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-18 03:22:12 +02:00
opentelemetry-go/sdk/metric/stress_test.go

503 lines
13 KiB
Go
Raw Normal View History

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This test is too large for the race detector. This SDK uses no locks
// that the race detector would help with, anyway.
//go:build !race
// +build !race
package metric
import (
"context"
"fmt"
"math"
"math/rand"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
)
const (
concurrencyPerCPU = 100
reclaimPeriod = time.Millisecond * 100
testRun = 5 * time.Second
epsilon = 1e-10
)
var Must = metric.Must
type (
testFixture struct {
// stop has to be aligned for 64-bit atomic operations.
stop int64
expected sync.Map
received sync.Map // Note: doesn't require synchronization
wg sync.WaitGroup
impl testImpl
T *testing.T
export.AggregatorSelector
lock sync.Mutex
lused map[string]bool
dupCheck map[testKey]int
totalDups int64
}
testKey struct {
labels string
descriptor *sdkapi.Descriptor
}
testImpl struct {
newInstrument func(meter metric.Meter, name string) SyncImpler
getUpdateValue func() number.Number
operate func(interface{}, context.Context, number.Number, []attribute.KeyValue)
newStore func() interface{}
// storeCollect and storeExpect are the same for
// counters, different for lastValues, to ensure we are
// testing the timestamps correctly.
storeCollect func(store interface{}, value number.Number, ts time.Time)
storeExpect func(store interface{}, value number.Number)
readStore func(store interface{}) number.Number
equalValues func(a, b number.Number) bool
}
SyncImpler interface {
SyncImpl() sdkapi.SyncImpl
}
// lastValueState supports merging lastValue values, for the case
// where a race condition causes duplicate records. We always
// take the later timestamp.
lastValueState struct {
// raw has to be aligned for 64-bit atomic operations.
raw number.Number
ts time.Time
}
)
func concurrency() int {
return concurrencyPerCPU * runtime.NumCPU()
}
func canonicalizeLabels(ls []attribute.KeyValue) string {
copy := append(ls[0:0:0], ls...)
sort.SliceStable(copy, func(i, j int) bool {
return copy[i].Key < copy[j].Key
})
var b strings.Builder
for _, kv := range copy {
b.WriteString(string(kv.Key))
b.WriteString("=")
b.WriteString(kv.Value.Emit())
b.WriteString("$")
}
return b.String()
}
func getPeriod() time.Duration {
dur := math.Max(
float64(reclaimPeriod)/10,
float64(reclaimPeriod)*(1+0.1*rand.NormFloat64()),
)
return time.Duration(dur)
}
func (f *testFixture) someLabels() []attribute.KeyValue {
n := 1 + rand.Intn(3)
l := make([]attribute.KeyValue, n)
for {
oused := map[string]bool{}
for i := 0; i < n; i++ {
var k string
for {
k = fmt.Sprint("k", rand.Intn(1000000000))
if !oused[k] {
oused[k] = true
break
}
}
l[i] = attribute.String(k, fmt.Sprint("v", rand.Intn(1000000000)))
}
lc := canonicalizeLabels(l)
f.lock.Lock()
avail := !f.lused[lc]
if avail {
f.lused[lc] = true
f.lock.Unlock()
return l
}
f.lock.Unlock()
}
}
func (f *testFixture) startWorker(impl *Accumulator, meter metric.Meter, wg *sync.WaitGroup, i int) {
ctx := context.Background()
name := fmt.Sprint("test_", i)
instrument := f.impl.newInstrument(meter, name)
var descriptor *sdkapi.Descriptor
if ii, ok := instrument.SyncImpl().(*syncInstrument); ok {
descriptor = &ii.descriptor
}
kvs := f.someLabels()
clabs := canonicalizeLabels(kvs)
dur := getPeriod()
key := testKey{
labels: clabs,
descriptor: descriptor,
}
for {
sleep := time.Duration(rand.ExpFloat64() * float64(dur))
time.Sleep(sleep)
value := f.impl.getUpdateValue()
f.impl.operate(instrument, ctx, value, kvs)
actual, _ := f.expected.LoadOrStore(key, f.impl.newStore())
f.impl.storeExpect(actual, value)
if atomic.LoadInt64(&f.stop) != 0 {
wg.Done()
return
}
}
}
func (f *testFixture) assertTest(numCollect int) {
var allErrs []func()
csize := 0
f.received.Range(func(key, gstore interface{}) bool {
csize++
gvalue := f.impl.readStore(gstore)
estore, loaded := f.expected.Load(key)
if !loaded {
allErrs = append(allErrs, func() {
f.T.Error("Could not locate expected key: ", key)
})
return true
}
evalue := f.impl.readStore(estore)
if !f.impl.equalValues(evalue, gvalue) {
allErrs = append(allErrs, func() {
f.T.Error("Expected value mismatch: ",
evalue, "!=", gvalue, " for ", key)
})
}
return true
})
rsize := 0
f.expected.Range(func(key, value interface{}) bool {
rsize++
if _, loaded := f.received.Load(key); !loaded {
allErrs = append(allErrs, func() {
f.T.Error("Did not receive expected key: ", key)
})
}
return true
})
if rsize != csize {
f.T.Error("Did not receive the correct set of metrics: Received != Expected", rsize, csize)
}
for _, anErr := range allErrs {
anErr()
}
// Note: It's useful to know the test triggers this condition,
// but we can't assert it. Infrequently no duplicates are
// found, and we can't really force a race to happen.
//
// fmt.Printf("Test duplicate records seen: %.1f%%\n",
// float64(100*f.totalDups/int64(numCollect*concurrency())))
}
func (f *testFixture) preCollect() {
Metrics stdout export pipeline (#265) * Add MetricAggregator.Merge() implementations * Update from feedback * Type * Ckpt * Ckpt * Add push controller * Ckpt * Add aggregator interfaces, stdout encoder * Modify basic main.go * Main is working * Batch stdout output * Sum udpate * Rename stdout * Add stateless/stateful Batcher options * Undo a for-loop in the example, remove a done TODO * Update imports * Add note * Rename defaultkeys * Support variable label encoder to speed OpenMetrics/Statsd export * Lint * Doc * Precommit/lint * Simplify Aggregator API * Record->Identifier * Remove export.Record a.k.a. Identifier * Checkpoint * Propagate errors to the SDK, remove a bunch of 'TODO warn' * Checkpoint * Introduce export.Labels * Comments in export/metric.go * Comment * More merge * More doc * Complete example * Lint fixes * Add a testable example * Lint * Let Export return an error * add a basic stdout exporter test * Add measure test; fix aggregator APIs * Use JSON numbers, not strings * Test stdout exporter error * Add a test for the call to RangeTest * Add error handler API to improve correctness test; return errors from RecordOne * Undo the previous -- do not expose errors * Add simple selector variations, test * Repair examples * Test push controller error handling * Add SDK label encoder tests * Add a defaultkeys batcher test * Add an ungrouped batcher test * Lint new tests * Respond to krnowak's feedback * Undo comment * Use concrete receivers for export records and labels, since the constructors return structs not pointers * Bug fix for stateful batchers; clone an aggregator for long term storage * Remove TODO addressed in #318 * Add errors to all aggregator interfaces * Handle ErrNoLastValue case in stdout exporter * Move aggregator API into sdk/export/metric/aggregator * Update all aggregator exported-method comments * Document the aggregator APIs * More aggregator comments * Add multiple updates to the ungrouped test * Fixes for feedback from Gustavo and Liz * Producer->CheckpointSet; add FinishedCollection * Process takes an export.Record * ReadCheckpoint->CheckpointSet * EncodeLabels->Encode * Format a better inconsistent type error; add more aggregator API tests * More RangeTest test coverage * Make benbjohnson/clock a test-only dependency * Handle ErrNoLastValue in stress_test
2019-11-15 13:01:20 -08:00
// Collect calls Process in a single-threaded context. No need
// to lock this struct.
f.dupCheck = map[testKey]int{}
}
Separate InstrumentationLibrary from metric.Descriptor (#2197) * factor instrumentation library out of the instrument descriptor * SDK tests pass * checkpoint work * otlp and opencensus tests passing * prometheus * tests pass, working on lint * lint applied: MetricReader->Reader * comments * Changelog * Apply suggestions from code review Co-authored-by: alrex <alrex.boten@gmail.com> * remove an interdependency * fix build * re-indent one * Apply suggestions from code review Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> * Lint&feedback * update after rename * comment fix * style fix for meter options * remove libraryReader, let Controller implement the reader API directly * rename 'impl' field to 'provider' * remove a type assertion * move metric/registry into internal; move registry.MeterProvider into metric controller * add test for controller registry function * CheckpointSet->Reader everywhere * lint * remove two unnecessary accessor methods; Controller implements MeterProvider and InstrumentationLibraryReader directly, no need to get these * use a sync.Map * ensure the initOnce is always called; handle multiple errors * Apply suggestions from code review Co-authored-by: Anthony Mirabella <a9@aneurysm9.com> * cleanup locking in metrictest * Revert "ensure the initOnce is always called; handle multiple errors" This reverts commit 3356eb5ed0c288ac3edcc2bc2e853aecda7f29b3. * Revert "use a sync.Map" This reverts commit ea7bc599bd3a24c4acb4cd9facd13f08cd702237. * restore the TODO about sync.Map Co-authored-by: alrex <alrex.boten@gmail.com> Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
2021-09-27 08:51:47 -07:00
func (*testFixture) Reader() export.Reader {
Metrics stdout export pipeline (#265) * Add MetricAggregator.Merge() implementations * Update from feedback * Type * Ckpt * Ckpt * Add push controller * Ckpt * Add aggregator interfaces, stdout encoder * Modify basic main.go * Main is working * Batch stdout output * Sum udpate * Rename stdout * Add stateless/stateful Batcher options * Undo a for-loop in the example, remove a done TODO * Update imports * Add note * Rename defaultkeys * Support variable label encoder to speed OpenMetrics/Statsd export * Lint * Doc * Precommit/lint * Simplify Aggregator API * Record->Identifier * Remove export.Record a.k.a. Identifier * Checkpoint * Propagate errors to the SDK, remove a bunch of 'TODO warn' * Checkpoint * Introduce export.Labels * Comments in export/metric.go * Comment * More merge * More doc * Complete example * Lint fixes * Add a testable example * Lint * Let Export return an error * add a basic stdout exporter test * Add measure test; fix aggregator APIs * Use JSON numbers, not strings * Test stdout exporter error * Add a test for the call to RangeTest * Add error handler API to improve correctness test; return errors from RecordOne * Undo the previous -- do not expose errors * Add simple selector variations, test * Repair examples * Test push controller error handling * Add SDK label encoder tests * Add a defaultkeys batcher test * Add an ungrouped batcher test * Lint new tests * Respond to krnowak's feedback * Undo comment * Use concrete receivers for export records and labels, since the constructors return structs not pointers * Bug fix for stateful batchers; clone an aggregator for long term storage * Remove TODO addressed in #318 * Add errors to all aggregator interfaces * Handle ErrNoLastValue case in stdout exporter * Move aggregator API into sdk/export/metric/aggregator * Update all aggregator exported-method comments * Document the aggregator APIs * More aggregator comments * Add multiple updates to the ungrouped test * Fixes for feedback from Gustavo and Liz * Producer->CheckpointSet; add FinishedCollection * Process takes an export.Record * ReadCheckpoint->CheckpointSet * EncodeLabels->Encode * Format a better inconsistent type error; add more aggregator API tests * More RangeTest test coverage * Make benbjohnson/clock a test-only dependency * Handle ErrNoLastValue in stress_test
2019-11-15 13:01:20 -08:00
return nil
}
func (f *testFixture) Process(accumulation export.Accumulation) error {
labels := accumulation.Labels().ToSlice()
key := testKey{
Replace `Ordered` with an iterator in `export.Labels`. (#567) * Do not expose a slice of labels in export.Record This is really an inconvenient implementation detail leak - we may want to store labels in a different way. Replace it with an iterator - it does not force us to use slice of key values as a storage in the long run. * Add Len to LabelIterator It may come in handy in several situations, where we don't have access to export.Labels object, but only to the label iterator. * Use reflect value label iterator for the fixed labels * add reset operation to iterator Makes my life easier when writing a benchmark. Might also be an alternative to cloning the iterator. * Add benchmarks for iterators * Add import comment * Add clone operation to label iterator * Move iterator tests to a separate package * Add tests for cloning iterators * Pass label iterator to export labels * Use non-addressable array reflect values By not using the value created by `reflect.New()`, but rather by `reflect.ValueOf()`, we get a non-addressable array in the value, which does not infer an allocation cost when getting an element from the array. * Drop zero iterator This can be substituted by a reflect value iterator that goes over a value with a zero-sized array. * Add a simple iterator that implements label iterator In the long run this will completely replace the LabelIterator interface. * Replace reflect value iterator with simple iterator * Pass label storage to new export labels, not label iterator * Drop label iterator interface, rename storage iterator to label iterator * Drop clone operation from iterator It's a leftover from interface times and now it's pointless - the iterator is a simple struct, so cloning it is a simple copy. * Drop Reset from label iterator The sole existence of Reset was actually for benchmarking convenience. Now we can just copy the iterator cheaply, so a need for Reset is no more. * Drop noop iterator tests * Move back iterator tests to export package * Eagerly get the reflect value of ordered labels So we won't get into problems when several goroutines want to iterate the same labels at the same time. Not sure if this would be a big deal, since every goroutine would compute the same reflect.Value, but concurrent write to the same memory is bad anyway. And it doesn't cost us any extra allocations anyway. * Replace NewSliceLabelIterator() with a method of LabelSlice * Add some documentation * Documentation fixes
2020-03-19 23:01:34 +01:00
labels: canonicalizeLabels(labels),
descriptor: accumulation.Descriptor(),
}
if f.dupCheck[key] == 0 {
f.dupCheck[key]++
} else {
f.totalDups++
}
actual, _ := f.received.LoadOrStore(key, f.impl.newStore())
agg := accumulation.Aggregator()
switch accumulation.Descriptor().InstrumentKind() {
case sdkapi.CounterInstrumentKind:
2020-06-09 22:53:30 -07:00
sum, err := agg.(aggregation.Sum).Sum()
Metrics stdout export pipeline (#265) * Add MetricAggregator.Merge() implementations * Update from feedback * Type * Ckpt * Ckpt * Add push controller * Ckpt * Add aggregator interfaces, stdout encoder * Modify basic main.go * Main is working * Batch stdout output * Sum udpate * Rename stdout * Add stateless/stateful Batcher options * Undo a for-loop in the example, remove a done TODO * Update imports * Add note * Rename defaultkeys * Support variable label encoder to speed OpenMetrics/Statsd export * Lint * Doc * Precommit/lint * Simplify Aggregator API * Record->Identifier * Remove export.Record a.k.a. Identifier * Checkpoint * Propagate errors to the SDK, remove a bunch of 'TODO warn' * Checkpoint * Introduce export.Labels * Comments in export/metric.go * Comment * More merge * More doc * Complete example * Lint fixes * Add a testable example * Lint * Let Export return an error * add a basic stdout exporter test * Add measure test; fix aggregator APIs * Use JSON numbers, not strings * Test stdout exporter error * Add a test for the call to RangeTest * Add error handler API to improve correctness test; return errors from RecordOne * Undo the previous -- do not expose errors * Add simple selector variations, test * Repair examples * Test push controller error handling * Add SDK label encoder tests * Add a defaultkeys batcher test * Add an ungrouped batcher test * Lint new tests * Respond to krnowak's feedback * Undo comment * Use concrete receivers for export records and labels, since the constructors return structs not pointers * Bug fix for stateful batchers; clone an aggregator for long term storage * Remove TODO addressed in #318 * Add errors to all aggregator interfaces * Handle ErrNoLastValue case in stdout exporter * Move aggregator API into sdk/export/metric/aggregator * Update all aggregator exported-method comments * Document the aggregator APIs * More aggregator comments * Add multiple updates to the ungrouped test * Fixes for feedback from Gustavo and Liz * Producer->CheckpointSet; add FinishedCollection * Process takes an export.Record * ReadCheckpoint->CheckpointSet * EncodeLabels->Encode * Format a better inconsistent type error; add more aggregator API tests * More RangeTest test coverage * Make benbjohnson/clock a test-only dependency * Handle ErrNoLastValue in stress_test
2019-11-15 13:01:20 -08:00
if err != nil {
f.T.Fatal("Sum error: ", err)
}
f.impl.storeCollect(actual, sum, time.Time{})
case sdkapi.HistogramInstrumentKind:
2020-06-09 22:53:30 -07:00
lv, ts, err := agg.(aggregation.LastValue).LastValue()
if err != nil && err != aggregation.ErrNoData {
Metrics stdout export pipeline (#265) * Add MetricAggregator.Merge() implementations * Update from feedback * Type * Ckpt * Ckpt * Add push controller * Ckpt * Add aggregator interfaces, stdout encoder * Modify basic main.go * Main is working * Batch stdout output * Sum udpate * Rename stdout * Add stateless/stateful Batcher options * Undo a for-loop in the example, remove a done TODO * Update imports * Add note * Rename defaultkeys * Support variable label encoder to speed OpenMetrics/Statsd export * Lint * Doc * Precommit/lint * Simplify Aggregator API * Record->Identifier * Remove export.Record a.k.a. Identifier * Checkpoint * Propagate errors to the SDK, remove a bunch of 'TODO warn' * Checkpoint * Introduce export.Labels * Comments in export/metric.go * Comment * More merge * More doc * Complete example * Lint fixes * Add a testable example * Lint * Let Export return an error * add a basic stdout exporter test * Add measure test; fix aggregator APIs * Use JSON numbers, not strings * Test stdout exporter error * Add a test for the call to RangeTest * Add error handler API to improve correctness test; return errors from RecordOne * Undo the previous -- do not expose errors * Add simple selector variations, test * Repair examples * Test push controller error handling * Add SDK label encoder tests * Add a defaultkeys batcher test * Add an ungrouped batcher test * Lint new tests * Respond to krnowak's feedback * Undo comment * Use concrete receivers for export records and labels, since the constructors return structs not pointers * Bug fix for stateful batchers; clone an aggregator for long term storage * Remove TODO addressed in #318 * Add errors to all aggregator interfaces * Handle ErrNoLastValue case in stdout exporter * Move aggregator API into sdk/export/metric/aggregator * Update all aggregator exported-method comments * Document the aggregator APIs * More aggregator comments * Add multiple updates to the ungrouped test * Fixes for feedback from Gustavo and Liz * Producer->CheckpointSet; add FinishedCollection * Process takes an export.Record * ReadCheckpoint->CheckpointSet * EncodeLabels->Encode * Format a better inconsistent type error; add more aggregator API tests * More RangeTest test coverage * Make benbjohnson/clock a test-only dependency * Handle ErrNoLastValue in stress_test
2019-11-15 13:01:20 -08:00
f.T.Fatal("Last value error: ", err)
}
f.impl.storeCollect(actual, lv, ts)
default:
panic("Not used in this test")
}
Metrics stdout export pipeline (#265) * Add MetricAggregator.Merge() implementations * Update from feedback * Type * Ckpt * Ckpt * Add push controller * Ckpt * Add aggregator interfaces, stdout encoder * Modify basic main.go * Main is working * Batch stdout output * Sum udpate * Rename stdout * Add stateless/stateful Batcher options * Undo a for-loop in the example, remove a done TODO * Update imports * Add note * Rename defaultkeys * Support variable label encoder to speed OpenMetrics/Statsd export * Lint * Doc * Precommit/lint * Simplify Aggregator API * Record->Identifier * Remove export.Record a.k.a. Identifier * Checkpoint * Propagate errors to the SDK, remove a bunch of 'TODO warn' * Checkpoint * Introduce export.Labels * Comments in export/metric.go * Comment * More merge * More doc * Complete example * Lint fixes * Add a testable example * Lint * Let Export return an error * add a basic stdout exporter test * Add measure test; fix aggregator APIs * Use JSON numbers, not strings * Test stdout exporter error * Add a test for the call to RangeTest * Add error handler API to improve correctness test; return errors from RecordOne * Undo the previous -- do not expose errors * Add simple selector variations, test * Repair examples * Test push controller error handling * Add SDK label encoder tests * Add a defaultkeys batcher test * Add an ungrouped batcher test * Lint new tests * Respond to krnowak's feedback * Undo comment * Use concrete receivers for export records and labels, since the constructors return structs not pointers * Bug fix for stateful batchers; clone an aggregator for long term storage * Remove TODO addressed in #318 * Add errors to all aggregator interfaces * Handle ErrNoLastValue case in stdout exporter * Move aggregator API into sdk/export/metric/aggregator * Update all aggregator exported-method comments * Document the aggregator APIs * More aggregator comments * Add multiple updates to the ungrouped test * Fixes for feedback from Gustavo and Liz * Producer->CheckpointSet; add FinishedCollection * Process takes an export.Record * ReadCheckpoint->CheckpointSet * EncodeLabels->Encode * Format a better inconsistent type error; add more aggregator API tests * More RangeTest test coverage * Make benbjohnson/clock a test-only dependency * Handle ErrNoLastValue in stress_test
2019-11-15 13:01:20 -08:00
return nil
}
func stressTest(t *testing.T, impl testImpl) {
ctx := context.Background()
t.Parallel()
fixture := &testFixture{
T: t,
impl: impl,
lused: map[string]bool{},
AggregatorSelector: processortest.AggregatorSelector(),
}
cc := concurrency()
sdk := NewAccumulator(fixture)
Separate InstrumentationLibrary from metric.Descriptor (#2197) * factor instrumentation library out of the instrument descriptor * SDK tests pass * checkpoint work * otlp and opencensus tests passing * prometheus * tests pass, working on lint * lint applied: MetricReader->Reader * comments * Changelog * Apply suggestions from code review Co-authored-by: alrex <alrex.boten@gmail.com> * remove an interdependency * fix build * re-indent one * Apply suggestions from code review Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> * Lint&feedback * update after rename * comment fix * style fix for meter options * remove libraryReader, let Controller implement the reader API directly * rename 'impl' field to 'provider' * remove a type assertion * move metric/registry into internal; move registry.MeterProvider into metric controller * add test for controller registry function * CheckpointSet->Reader everywhere * lint * remove two unnecessary accessor methods; Controller implements MeterProvider and InstrumentationLibraryReader directly, no need to get these * use a sync.Map * ensure the initOnce is always called; handle multiple errors * Apply suggestions from code review Co-authored-by: Anthony Mirabella <a9@aneurysm9.com> * cleanup locking in metrictest * Revert "ensure the initOnce is always called; handle multiple errors" This reverts commit 3356eb5ed0c288ac3edcc2bc2e853aecda7f29b3. * Revert "use a sync.Map" This reverts commit ea7bc599bd3a24c4acb4cd9facd13f08cd702237. * restore the TODO about sync.Map Co-authored-by: alrex <alrex.boten@gmail.com> Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
2021-09-27 08:51:47 -07:00
meter := metric.WrapMeterImpl(sdk)
fixture.wg.Add(cc + 1)
for i := 0; i < cc; i++ {
go fixture.startWorker(sdk, meter, &fixture.wg, i)
}
numCollect := 0
go func() {
for {
time.Sleep(reclaimPeriod)
fixture.preCollect()
sdk.Collect(ctx)
numCollect++
if atomic.LoadInt64(&fixture.stop) != 0 {
fixture.wg.Done()
return
}
}
}()
time.Sleep(testRun)
atomic.StoreInt64(&fixture.stop, 1)
fixture.wg.Wait()
fixture.preCollect()
sdk.Collect(ctx)
numCollect++
fixture.assertTest(numCollect)
}
func int64sEqual(a, b number.Number) bool {
return a.AsInt64() == b.AsInt64()
}
func float64sEqual(a, b number.Number) bool {
diff := math.Abs(a.AsFloat64() - b.AsFloat64())
return diff < math.Abs(a.AsFloat64())*epsilon
}
// Counters
func intCounterTestImpl() testImpl {
return testImpl{
newInstrument: func(meter metric.Meter, name string) SyncImpler {
return Must(meter).NewInt64Counter(name + ".sum")
},
getUpdateValue: func() number.Number {
for {
x := int64(rand.Intn(100))
if x != 0 {
return number.NewInt64Number(x)
}
}
},
operate: func(inst interface{}, ctx context.Context, value number.Number, labels []attribute.KeyValue) {
counter := inst.(metric.Int64Counter)
counter.Add(ctx, value.AsInt64(), labels...)
},
newStore: func() interface{} {
n := number.NewInt64Number(0)
return &n
},
storeCollect: func(store interface{}, value number.Number, _ time.Time) {
store.(*number.Number).AddInt64Atomic(value.AsInt64())
},
storeExpect: func(store interface{}, value number.Number) {
store.(*number.Number).AddInt64Atomic(value.AsInt64())
},
readStore: func(store interface{}) number.Number {
return store.(*number.Number).AsNumberAtomic()
},
equalValues: int64sEqual,
}
}
func TestStressInt64Counter(t *testing.T) {
stressTest(t, intCounterTestImpl())
}
func floatCounterTestImpl() testImpl {
return testImpl{
newInstrument: func(meter metric.Meter, name string) SyncImpler {
return Must(meter).NewFloat64Counter(name + ".sum")
},
getUpdateValue: func() number.Number {
for {
x := rand.Float64()
if x != 0 {
return number.NewFloat64Number(x)
}
}
},
operate: func(inst interface{}, ctx context.Context, value number.Number, labels []attribute.KeyValue) {
counter := inst.(metric.Float64Counter)
counter.Add(ctx, value.AsFloat64(), labels...)
},
newStore: func() interface{} {
n := number.NewFloat64Number(0.0)
return &n
},
storeCollect: func(store interface{}, value number.Number, _ time.Time) {
store.(*number.Number).AddFloat64Atomic(value.AsFloat64())
},
storeExpect: func(store interface{}, value number.Number) {
store.(*number.Number).AddFloat64Atomic(value.AsFloat64())
},
readStore: func(store interface{}) number.Number {
return store.(*number.Number).AsNumberAtomic()
},
equalValues: float64sEqual,
}
}
func TestStressFloat64Counter(t *testing.T) {
stressTest(t, floatCounterTestImpl())
}
// LastValue
func intLastValueTestImpl() testImpl {
return testImpl{
newInstrument: func(meter metric.Meter, name string) SyncImpler {
return Must(meter).NewInt64Histogram(name + ".lastvalue")
},
getUpdateValue: func() number.Number {
r1 := rand.Int63()
return number.NewInt64Number(rand.Int63() - r1)
},
operate: func(inst interface{}, ctx context.Context, value number.Number, labels []attribute.KeyValue) {
histogram := inst.(metric.Int64Histogram)
histogram.Record(ctx, value.AsInt64(), labels...)
},
newStore: func() interface{} {
return &lastValueState{
raw: number.NewInt64Number(0),
}
},
storeCollect: func(store interface{}, value number.Number, ts time.Time) {
gs := store.(*lastValueState)
if !ts.Before(gs.ts) {
gs.ts = ts
gs.raw.SetInt64Atomic(value.AsInt64())
}
},
storeExpect: func(store interface{}, value number.Number) {
gs := store.(*lastValueState)
gs.raw.SetInt64Atomic(value.AsInt64())
},
readStore: func(store interface{}) number.Number {
gs := store.(*lastValueState)
return gs.raw.AsNumberAtomic()
},
equalValues: int64sEqual,
}
}
func TestStressInt64LastValue(t *testing.T) {
stressTest(t, intLastValueTestImpl())
}
func floatLastValueTestImpl() testImpl {
return testImpl{
newInstrument: func(meter metric.Meter, name string) SyncImpler {
return Must(meter).NewFloat64Histogram(name + ".lastvalue")
},
getUpdateValue: func() number.Number {
return number.NewFloat64Number((-0.5 + rand.Float64()) * 100000)
},
operate: func(inst interface{}, ctx context.Context, value number.Number, labels []attribute.KeyValue) {
histogram := inst.(metric.Float64Histogram)
histogram.Record(ctx, value.AsFloat64(), labels...)
},
newStore: func() interface{} {
return &lastValueState{
raw: number.NewFloat64Number(0),
}
},
storeCollect: func(store interface{}, value number.Number, ts time.Time) {
gs := store.(*lastValueState)
if !ts.Before(gs.ts) {
gs.ts = ts
gs.raw.SetFloat64Atomic(value.AsFloat64())
}
},
storeExpect: func(store interface{}, value number.Number) {
gs := store.(*lastValueState)
gs.raw.SetFloat64Atomic(value.AsFloat64())
},
readStore: func(store interface{}) number.Number {
gs := store.(*lastValueState)
return gs.raw.AsNumberAtomic()
},
equalValues: float64sEqual,
}
}
func TestStressFloat64LastValue(t *testing.T) {
stressTest(t, floatLastValueTestImpl())
}