1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-12 10:04:29 +02:00

Replace ErrNoLastValue and ErrEmptyDataSet by ErrNoData (#557)

Handle ForEach returning an error
This commit is contained in:
ET 2020-03-16 16:28:33 -07:00 committed by GitHub
parent 6ada85adba
commit 217a97d9b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 205 additions and 153 deletions

View File

@ -163,16 +163,15 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
var aggErr error var aggErr error
var sendErr error var sendErr error
checkpointSet.ForEach(func(rec export.Record) { aggErr = checkpointSet.ForEach(func(rec export.Record) error {
before := buf.Len() before := buf.Len()
if err := e.formatMetric(rec, buf); err != nil && aggErr == nil { if err := e.formatMetric(rec, buf); err != nil {
aggErr = err return err
return
} }
if buf.Len() < e.config.MaxPacketSize { if buf.Len() < e.config.MaxPacketSize {
return return nil
} }
if before == 0 { if before == 0 {
// A single metric >= packet size // A single metric >= packet size
@ -180,7 +179,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
sendErr = err sendErr = err
} }
buf.Reset() buf.Reset()
return return nil
} }
// Send and copy the leftover // Send and copy the leftover
@ -193,6 +192,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
copy(buf.Bytes()[0:leftover], buf.Bytes()[before:]) copy(buf.Bytes()[0:leftover], buf.Bytes()[before:])
buf.Truncate(leftover) buf.Truncate(leftover)
return nil
}) })
if err := e.send(buf.Bytes()); err != nil && sendErr == nil { if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err sendErr = err

View File

@ -184,8 +184,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
return return
} }
c.exp.snapshot.ForEach(func(record export.Record) { _ = c.exp.snapshot.ForEach(func(record export.Record) error {
ch <- c.toDesc(&record) ch <- c.toDesc(&record)
return nil
}) })
} }
@ -198,7 +199,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
return return
} }
c.exp.snapshot.ForEach(func(record export.Record) { _ = c.exp.snapshot.ForEach(func(record export.Record) error {
agg := record.Aggregator() agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind() numberKind := record.Descriptor().NumberKind()
labels := labelValues(record.Labels()) labels := labelValues(record.Labels())
@ -222,6 +223,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
} else if lastValue, ok := agg.(aggregator.LastValue); ok { } else if lastValue, ok := agg.(aggregator.LastValue); ok {
c.exportLastValue(ch, lastValue, numberKind, desc, labels) c.exportLastValue(ch, lastValue, numberKind, desc, labels)
} }
return nil
}) })
} }

View File

@ -139,15 +139,13 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e
} }
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
// N.B. Only return one aggError, if any occur. They're likely
// to be duplicates of the same error.
var aggError error var aggError error
var batch expoBatch var batch expoBatch
if !e.config.DoNotPrintTime { if !e.config.DoNotPrintTime {
ts := time.Now() ts := time.Now()
batch.Timestamp = &ts batch.Timestamp = &ts
} }
checkpointSet.ForEach(func(record export.Record) { aggError = checkpointSet.ForEach(func(record export.Record) error {
desc := record.Descriptor() desc := record.Descriptor()
agg := record.Aggregator() agg := record.Aggregator()
kind := desc.NumberKind() kind := desc.NumberKind()
@ -155,47 +153,31 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
var expose expoLine var expose expoLine
if sum, ok := agg.(aggregator.Sum); ok { if sum, ok := agg.(aggregator.Sum); ok {
if value, err := sum.Sum(); err != nil { value, err := sum.Sum()
aggError = err if err != nil {
expose.Sum = "NaN" return err
} else {
expose.Sum = value.AsInterface(kind)
} }
expose.Sum = value.AsInterface(kind)
} }
if mmsc, ok := agg.(aggregator.MinMaxSumCount); ok { if mmsc, ok := agg.(aggregator.MinMaxSumCount); ok {
if count, err := mmsc.Count(); err != nil { count, err := mmsc.Count()
aggError = err if err != nil {
expose.Count = "NaN" return err
} else {
expose.Count = count
} }
expose.Count = count
if max, err := mmsc.Max(); err != nil { max, err := mmsc.Max()
if err == aggregator.ErrEmptyDataSet { if err != nil {
// This is a special case, indicates an aggregator that return err
// was checkpointed before its first value was set.
return
}
aggError = err
expose.Max = "NaN"
} else {
expose.Max = max.AsInterface(kind)
} }
expose.Max = max.AsInterface(kind)
if min, err := mmsc.Min(); err != nil { min, err := mmsc.Min()
if err == aggregator.ErrEmptyDataSet { if err != nil {
// This is a special case, indicates an aggregator that return err
// was checkpointed before its first value was set.
return
}
aggError = err
expose.Min = "NaN"
} else {
expose.Min = min.AsInterface(kind)
} }
expose.Min = min.AsInterface(kind)
if dist, ok := agg.(aggregator.Distribution); ok && len(e.config.Quantiles) != 0 { if dist, ok := agg.(aggregator.Distribution); ok && len(e.config.Quantiles) != 0 {
summary := make([]expoQuantile, len(e.config.Quantiles)) summary := make([]expoQuantile, len(e.config.Quantiles))
@ -203,12 +185,11 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
for i, q := range e.config.Quantiles { for i, q := range e.config.Quantiles {
var vstr interface{} var vstr interface{}
if value, err := dist.Quantile(q); err != nil { value, err := dist.Quantile(q)
aggError = err if err != nil {
vstr = "NaN" return err
} else {
vstr = value.AsInterface(kind)
} }
vstr = value.AsInterface(kind)
summary[i] = expoQuantile{ summary[i] = expoQuantile{
Q: q, Q: q,
V: vstr, V: vstr,
@ -216,21 +197,14 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
} }
} }
} else if lv, ok := agg.(aggregator.LastValue); ok { } else if lv, ok := agg.(aggregator.LastValue); ok {
if value, timestamp, err := lv.LastValue(); err != nil { value, timestamp, err := lv.LastValue()
if err == aggregator.ErrNoLastValue { if err != nil {
// This is a special case, indicates an aggregator that return err
// was checkpointed before its first value was set. }
return expose.LastValue = value.AsInterface(kind)
}
aggError = err if !e.config.DoNotPrintTime {
expose.LastValue = "NaN" expose.Timestamp = &timestamp
} else {
expose.LastValue = value.AsInterface(kind)
if !e.config.DoNotPrintTime {
expose.Timestamp = &timestamp
}
} }
} }
@ -264,6 +238,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
expose.Name = sb.String() expose.Name = sb.String()
batch.Updates = append(batch.Updates, expose) batch.Updates = append(batch.Updates, expose)
return nil
}) })
var data []byte var data []byte

View File

@ -221,7 +221,7 @@ func TestStdoutMeasureFormat(t *testing.T) {
}`, fix.Output()) }`, fix.Output())
} }
func TestStdoutEmptyDataSet(t *testing.T) { func TestStdoutNoData(t *testing.T) {
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind) desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind)
for name, tc := range map[string]export.Aggregator{ for name, tc := range map[string]export.Aggregator{
"ddsketch": ddsketch.New(ddsketch.NewDefaultConfig(), desc), "ddsketch": ddsketch.New(ddsketch.NewDefaultConfig(), desc),

View File

@ -2,9 +2,11 @@ package test
import ( import (
"context" "context"
"errors"
"go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
@ -82,8 +84,11 @@ func (p *CheckpointSet) updateAggregator(desc *export.Descriptor, newAgg export.
} }
} }
func (p *CheckpointSet) ForEach(f func(export.Record)) { func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
for _, r := range p.updates { for _, r := range p.updates {
f(r) if err := f(r); err != nil && !errors.Is(err, aggregator.ErrNoData) {
return err
}
} }
return nil
} }

View File

@ -77,9 +77,9 @@ func TestMinMaxSumCountValue(t *testing.T) {
assert.NoError(t, mmsc.Update(context.Background(), 1, &metricsdk.Descriptor{})) assert.NoError(t, mmsc.Update(context.Background(), 1, &metricsdk.Descriptor{}))
assert.NoError(t, mmsc.Update(context.Background(), 10, &metricsdk.Descriptor{})) assert.NoError(t, mmsc.Update(context.Background(), 10, &metricsdk.Descriptor{}))
// Prior to checkpointing ErrEmptyDataSet should be returned. // Prior to checkpointing ErrNoData should be returned.
_, _, _, _, err := minMaxSumCountValues(mmsc) _, _, _, _, err := minMaxSumCountValues(mmsc)
assert.Error(t, err, aggregator.ErrEmptyDataSet) assert.EqualError(t, err, aggregator.ErrNoData.Error())
// Checkpoint to set non-zero values // Checkpoint to set non-zero values
mmsc.Checkpoint(context.Background(), &metricsdk.Descriptor{}) mmsc.Checkpoint(context.Background(), &metricsdk.Descriptor{})
@ -186,13 +186,13 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
} }
func TestMinMaxSumCountPropagatesErrors(t *testing.T) { func TestMinMaxSumCountPropagatesErrors(t *testing.T) {
// ErrEmptyDataSet should be returned by both the Min and Max values of // ErrNoData should be returned by both the Min and Max values of
// a MinMaxSumCount Aggregator. Use this fact to check the error is // a MinMaxSumCount Aggregator. Use this fact to check the error is
// correctly returned. // correctly returned.
mmsc := minmaxsumcount.New(&metricsdk.Descriptor{}) mmsc := minmaxsumcount.New(&metricsdk.Descriptor{})
_, _, _, _, err := minMaxSumCountValues(mmsc) _, _, _, _, err := minMaxSumCountValues(mmsc)
assert.Error(t, err) assert.Error(t, err)
assert.Equal(t, aggregator.ErrEmptyDataSet, err) assert.Equal(t, aggregator.ErrNoData, err)
} }
func TestSumMetricDescriptor(t *testing.T) { func TestSumMetricDescriptor(t *testing.T) {

View File

@ -217,12 +217,13 @@ func (e *Exporter) Export(ctx context.Context, cps metricsdk.CheckpointSet) erro
// Seed records into the work processing pool. // Seed records into the work processing pool.
records := make(chan metricsdk.Record) records := make(chan metricsdk.Record)
go func() { go func() {
cps.ForEach(func(record metricsdk.Record) { _ = cps.ForEach(func(record metricsdk.Record) (err error) {
select { select {
case <-e.stopCh: case <-e.stopCh:
case <-ctx.Done(): case <-ctx.Done():
case records <- record: case records <- record:
} }
return
}) })
close(records) close(records)
}() }()
@ -268,7 +269,7 @@ func (e *Exporter) processMetrics(ctx context.Context, out chan<- *metricpb.Metr
for r := range in { for r := range in {
m, err := transform.Record(r) m, err := transform.Record(r)
if err != nil { if err != nil {
if err == aggregator.ErrEmptyDataSet { if err == aggregator.ErrNoData {
// The Aggregator was checkpointed before the first value // The Aggregator was checkpointed before the first value
// was set, skipping. // was set, skipping.
continue continue

View File

@ -97,20 +97,12 @@ var (
ErrInvalidQuantile = fmt.Errorf("the requested quantile is out of range") ErrInvalidQuantile = fmt.Errorf("the requested quantile is out of range")
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument") ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
ErrNaNInput = fmt.Errorf("NaN value is an invalid input") ErrNaNInput = fmt.Errorf("NaN value is an invalid input")
ErrNonMonotoneInput = fmt.Errorf("the new value is not monotone")
ErrInconsistentType = fmt.Errorf("inconsistent aggregator types") ErrInconsistentType = fmt.Errorf("inconsistent aggregator types")
// ErrNoLastValue is returned by the LastValue interface when // ErrNoData is returned when (due to a race with collection)
// (due to a race with collection) the Aggregator is // the Aggregator is check-pointed before the first value is set.
// checkpointed before the first value is set. The aggregator // The aggregator should simply be skipped in this case.
// should simply be skipped in this case. ErrNoData = fmt.Errorf("no data collected by this aggregator")
ErrNoLastValue = fmt.Errorf("no value has been set")
// ErrEmptyDataSet is returned by Max and Quantile interfaces
// when (due to a race with collection) the Aggregator is
// checkpointed before the first value is set. The aggregator
// should simply be skipped in this case.
ErrEmptyDataSet = fmt.Errorf("the result is not defined on an empty data set")
) )
// NewInconsistentMergeError formats an error describing an attempt to // NewInconsistentMergeError formats an error describing an attempt to

View File

@ -201,8 +201,13 @@ type LabelEncoder interface {
type CheckpointSet interface { type CheckpointSet interface {
// ForEach iterates over aggregated checkpoints for all // ForEach iterates over aggregated checkpoints for all
// metrics that were updated during the last collection // metrics that were updated during the last collection
// period. // period. Each aggregated checkpoint returned by the
ForEach(func(Record)) // function parameter may return an error.
// ForEach tolerates ErrNoData silently, as this is
// expected from the Meter implementation. Any other kind
// of error will immediately halt ForEach and return
// the error to the caller.
ForEach(func(Record) error) error
} }
// Record contains the exported data for a single metric instrument // Record contains the exported data for a single metric instrument

View File

@ -177,7 +177,7 @@ func (p *points) Swap(i, j int) {
// of a quantile. // of a quantile.
func (p *points) Quantile(q float64) (core.Number, error) { func (p *points) Quantile(q float64) (core.Number, error) {
if len(*p) == 0 { if len(*p) == 0 {
return core.Number(0), aggregator.ErrEmptyDataSet return core.Number(0), aggregator.ErrNoData
} }
if q < 0 || q > 1 { if q < 0 || q > 1 {

View File

@ -204,15 +204,15 @@ func TestArrayErrors(t *testing.T) {
_, err := agg.Max() _, err := agg.Max()
require.Error(t, err) require.Error(t, err)
require.Equal(t, err, aggregator.ErrEmptyDataSet) require.Equal(t, err, aggregator.ErrNoData)
_, err = agg.Min() _, err = agg.Min()
require.Error(t, err) require.Error(t, err)
require.Equal(t, err, aggregator.ErrEmptyDataSet) require.Equal(t, err, aggregator.ErrNoData)
_, err = agg.Quantile(0.1) _, err = agg.Quantile(0.1)
require.Error(t, err) require.Error(t, err)
require.Equal(t, err, aggregator.ErrEmptyDataSet) require.Equal(t, err, aggregator.ErrNoData)
ctx := context.Background() ctx := context.Background()

View File

@ -85,7 +85,7 @@ func (c *Aggregator) Min() (core.Number, error) {
// It is an error if `q` is less than 0 or greated than 1. // It is an error if `q` is less than 0 or greated than 1.
func (c *Aggregator) Quantile(q float64) (core.Number, error) { func (c *Aggregator) Quantile(q float64) (core.Number, error) {
if c.checkpoint.Count() == 0 { if c.checkpoint.Count() == 0 {
return core.Number(0), aggregator.ErrEmptyDataSet return core.Number(0), aggregator.ErrNoData
} }
f := c.checkpoint.Quantile(q) f := c.checkpoint.Quantile(q)
if math.IsNaN(f) { if math.IsNaN(f) {

View File

@ -68,13 +68,13 @@ func New() *Aggregator {
} }
// LastValue returns the last-recorded lastValue value and the // LastValue returns the last-recorded lastValue value and the
// corresponding timestamp. The error value aggregator.ErrNoLastValue // corresponding timestamp. The error value aggregator.ErrNoData
// will be returned if (due to a race condition) the checkpoint was // will be returned if (due to a race condition) the checkpoint was
// computed before the first value was set. // computed before the first value was set.
func (g *Aggregator) LastValue() (core.Number, time.Time, error) { func (g *Aggregator) LastValue() (core.Number, time.Time, error) {
gd := (*lastValueData)(g.checkpoint) gd := (*lastValueData)(g.checkpoint)
if gd == unsetLastValue { if gd == unsetLastValue {
return core.Number(0), time.Time{}, aggregator.ErrNoLastValue return core.Number(0), time.Time{}, aggregator.ErrNoData
} }
return gd.value.AsNumber(), gd.timestamp, nil return gd.value.AsNumber(), gd.timestamp, nil
} }

View File

@ -113,7 +113,7 @@ func TestLastValueNotSet(t *testing.T) {
g.Checkpoint(context.Background(), descriptor) g.Checkpoint(context.Background(), descriptor)
value, timestamp, err := g.LastValue() value, timestamp, err := g.LastValue()
require.Equal(t, aggregator.ErrNoLastValue, err) require.Equal(t, aggregator.ErrNoData, err)
require.True(t, timestamp.IsZero()) require.True(t, timestamp.IsZero())
require.Equal(t, core.Number(0), value) require.Equal(t, core.Number(0), value)
} }

View File

@ -86,25 +86,25 @@ func (c *Aggregator) Count() (int64, error) {
} }
// Min returns the minimum value in the checkpoint. // Min returns the minimum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet will be returned // The error value aggregator.ErrNoData will be returned
// if there were no measurements recorded during the checkpoint. // if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Min() (core.Number, error) { func (c *Aggregator) Min() (core.Number, error) {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) { if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrEmptyDataSet return c.kind.Zero(), aggregator.ErrNoData
} }
return c.checkpoint().min, nil return c.checkpoint().min, nil
} }
// Max returns the maximum value in the checkpoint. // Max returns the maximum value in the checkpoint.
// The error value aggregator.ErrEmptyDataSet will be returned // The error value aggregator.ErrNoData will be returned
// if there were no measurements recorded during the checkpoint. // if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Max() (core.Number, error) { func (c *Aggregator) Max() (core.Number, error) {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
if c.checkpoint().count.IsZero(core.Uint64NumberKind) { if c.checkpoint().count.IsZero(core.Uint64NumberKind) {
return c.kind.Zero(), aggregator.ErrEmptyDataSet return c.kind.Zero(), aggregator.ErrNoData
} }
return c.checkpoint().max, nil return c.checkpoint().max, nil
} }

View File

@ -234,7 +234,7 @@ func TestMaxSumCountNotSet(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
max, err := agg.Max() max, err := agg.Max()
require.Equal(t, aggregator.ErrEmptyDataSet, err) require.Equal(t, aggregator.ErrNoData, err)
require.Equal(t, core.Number(0), max) require.Equal(t, core.Number(0), max)
}) })
} }

View File

@ -16,9 +16,11 @@ package defaultkeys // import "go.opentelemetry.io/otel/sdk/metric/batcher/defau
import ( import (
"context" "context"
"errors"
"go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
) )
type ( type (
@ -153,8 +155,11 @@ func (b *Batcher) FinishedCollection() {
} }
} }
func (p *checkpointSet) ForEach(f func(export.Record)) { func (p *checkpointSet) ForEach(f func(export.Record) error) error {
for _, entry := range p.aggCheckpointMap { for _, entry := range p.aggCheckpointMap {
f(entry) if err := f(entry); err != nil && !errors.Is(err, aggregator.ErrNoData) {
return err
}
} }
return nil
} }

View File

@ -50,7 +50,8 @@ func TestGroupingStateless(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records := test.Output{} records := test.Output{}
checkpointSet.ForEach(records.AddTo) err := checkpointSet.ForEach(records.AddTo)
require.NoError(t, err)
// Repeat for {counter,lastvalue}.{1,2}. // Repeat for {counter,lastvalue}.{1,2}.
// Output lastvalue should have only the "G=H" and "G=" keys. // Output lastvalue should have only the "G=H" and "G=" keys.
@ -69,8 +70,9 @@ func TestGroupingStateless(t *testing.T) {
// Verify that state is reset by FinishedCollection() // Verify that state is reset by FinishedCollection()
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
b.FinishedCollection() b.FinishedCollection()
checkpointSet.ForEach(func(rec export.Record) { _ = checkpointSet.ForEach(func(rec export.Record) error {
t.Fatal("Unexpected call") t.Fatal("Unexpected call")
return nil
}) })
} }
@ -90,7 +92,8 @@ func TestGroupingStateful(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records1 := test.Output{} records1 := test.Output{}
checkpointSet.ForEach(records1.AddTo) err := checkpointSet.ForEach(records1.AddTo)
require.NoError(t, err)
require.EqualValues(t, map[string]int64{ require.EqualValues(t, map[string]int64{
"sum.a/C=D": 10, // labels1 "sum.a/C=D": 10, // labels1
@ -102,7 +105,8 @@ func TestGroupingStateful(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records2 := test.Output{} records2 := test.Output{}
checkpointSet.ForEach(records2.AddTo) err = checkpointSet.ForEach(records2.AddTo)
require.NoError(t, err)
require.EqualValues(t, records1, records2) require.EqualValues(t, records1, records2)
@ -118,7 +122,8 @@ func TestGroupingStateful(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records3 := test.Output{} records3 := test.Output{}
checkpointSet.ForEach(records3.AddTo) err = checkpointSet.ForEach(records3.AddTo)
require.NoError(t, err)
require.EqualValues(t, records1, records3) require.EqualValues(t, records1, records3)
@ -130,7 +135,8 @@ func TestGroupingStateful(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records4 := test.Output{} records4 := test.Output{}
checkpointSet.ForEach(records4.AddTo) err = checkpointSet.ForEach(records4.AddTo)
require.NoError(t, err)
require.EqualValues(t, map[string]int64{ require.EqualValues(t, map[string]int64{
"sum.a/C=D": 30, "sum.a/C=D": 30,

View File

@ -134,7 +134,7 @@ func CounterAgg(desc *export.Descriptor, v int64) export.Aggregator {
// AddTo adds a name/label-encoding entry with the lastValue or counter // AddTo adds a name/label-encoding entry with the lastValue or counter
// value to the output map. // value to the output map.
func (o Output) AddTo(rec export.Record) { func (o Output) AddTo(rec export.Record) error {
labels := rec.Labels() labels := rec.Labels()
key := fmt.Sprint(rec.Descriptor().Name(), "/", labels.Encoded()) key := fmt.Sprint(rec.Descriptor().Name(), "/", labels.Encoded())
var value int64 var value int64
@ -147,4 +147,5 @@ func (o Output) AddTo(rec export.Record) {
value = lv.AsInt64() value = lv.AsInt64()
} }
o[key] = value o[key] = value
return nil
} }

View File

@ -16,8 +16,10 @@ package ungrouped // import "go.opentelemetry.io/otel/sdk/metric/batcher/ungroup
import ( import (
"context" "context"
"errors"
export "go.opentelemetry.io/otel/sdk/export/metric" export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
) )
type ( type (
@ -101,12 +103,15 @@ func (b *Batcher) FinishedCollection() {
} }
} }
func (c batchMap) ForEach(f func(export.Record)) { func (c batchMap) ForEach(f func(export.Record) error) error {
for key, value := range c { for key, value := range c {
f(export.NewRecord( if err := f(export.NewRecord(
key.descriptor, key.descriptor,
value.labels, value.labels,
value.aggregator, value.aggregator,
)) )); err != nil && !errors.Is(err, aggregator.ErrNoData) {
return err
}
} }
return nil
} }

View File

@ -62,7 +62,7 @@ func TestUngroupedStateless(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records := test.Output{} records := test.Output{}
checkpointSet.ForEach(records.AddTo) _ = checkpointSet.ForEach(records.AddTo)
// Output lastvalue should have only the "G=H" and "G=" keys. // Output lastvalue should have only the "G=H" and "G=" keys.
// Output counter should have only the "C=D" and "C=" keys. // Output counter should have only the "C=D" and "C=" keys.
@ -84,8 +84,9 @@ func TestUngroupedStateless(t *testing.T) {
// Verify that state was reset // Verify that state was reset
checkpointSet = b.CheckpointSet() checkpointSet = b.CheckpointSet()
b.FinishedCollection() b.FinishedCollection()
checkpointSet.ForEach(func(rec export.Record) { _ = checkpointSet.ForEach(func(rec export.Record) error {
t.Fatal("Unexpected call") t.Fatal("Unexpected call")
return nil
}) })
} }
@ -105,7 +106,7 @@ func TestUngroupedStateful(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records1 := test.Output{} records1 := test.Output{}
checkpointSet.ForEach(records1.AddTo) _ = checkpointSet.ForEach(records1.AddTo)
require.EqualValues(t, map[string]int64{ require.EqualValues(t, map[string]int64{
"sum.a/G~H&C~D": 10, // labels1 "sum.a/G~H&C~D": 10, // labels1
@ -117,7 +118,7 @@ func TestUngroupedStateful(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records2 := test.Output{} records2 := test.Output{}
checkpointSet.ForEach(records2.AddTo) _ = checkpointSet.ForEach(records2.AddTo)
require.EqualValues(t, records1, records2) require.EqualValues(t, records1, records2)
@ -133,7 +134,7 @@ func TestUngroupedStateful(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records3 := test.Output{} records3 := test.Output{}
checkpointSet.ForEach(records3.AddTo) _ = checkpointSet.ForEach(records3.AddTo)
require.EqualValues(t, records1, records3) require.EqualValues(t, records1, records3)
@ -145,7 +146,7 @@ func TestUngroupedStateful(t *testing.T) {
b.FinishedCollection() b.FinishedCollection()
records4 := test.Output{} records4 := test.Output{}
checkpointSet.ForEach(records4.AddTo) _ = checkpointSet.ForEach(records4.AddTo)
require.EqualValues(t, map[string]int64{ require.EqualValues(t, map[string]int64{
"sum.a/G~H&C~D": 30, "sum.a/G~H&C~D": 30,

View File

@ -190,10 +190,10 @@ type syncCheckpointSet struct {
var _ export.CheckpointSet = (*syncCheckpointSet)(nil) var _ export.CheckpointSet = (*syncCheckpointSet)(nil)
func (c syncCheckpointSet) ForEach(fn func(export.Record)) { func (c syncCheckpointSet) ForEach(fn func(export.Record) error) error {
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
c.delegate.ForEach(fn) return c.delegate.ForEach(fn)
} }
func (realClock) Now() time.Time { func (realClock) Now() time.Time {

View File

@ -43,11 +43,11 @@ type testBatcher struct {
} }
type testExporter struct { type testExporter struct {
t *testing.T t *testing.T
lock sync.Mutex lock sync.Mutex
exports int exports int
records []export.Record records []export.Record
retErr error injectErr func(r export.Record) error
} }
type testFixture struct { type testFixture struct {
@ -118,10 +118,20 @@ func (e *testExporter) Export(_ context.Context, checkpointSet export.Checkpoint
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
e.exports++ e.exports++
checkpointSet.ForEach(func(r export.Record) { var records []export.Record
e.records = append(e.records, r) if err := checkpointSet.ForEach(func(r export.Record) error {
}) if e.injectErr != nil {
return e.retErr if err := e.injectErr(r); err != nil {
return err
}
}
records = append(records, r)
return nil
}); err != nil {
return err
}
e.records = records
return nil
} }
func (e *testExporter) resetRecords() ([]export.Record, int) { func (e *testExporter) resetRecords() ([]export.Record, int) {
@ -230,37 +240,81 @@ func TestPushTicker(t *testing.T) {
} }
func TestPushExportError(t *testing.T) { func TestPushExportError(t *testing.T) {
fix := newFixture(t) injector := func(name string, e error) func(r export.Record) error {
fix.exporter.retErr = fmt.Errorf("test export error") return func(r export.Record) error {
if r.Descriptor().Name() == name {
return e
}
return nil
}
}
var errAggregator = fmt.Errorf("unexpected error")
var tests = []struct {
name string
injectedError error
expectedDescriptors []string
expectedError error
}{
{"errNone", nil, []string{"counter1", "counter2"}, nil},
{"errNoData", aggregator.ErrNoData, []string{"counter2"}, nil},
{"errUnexpected", errAggregator, []string{}, errAggregator},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fix := newFixture(t)
fix.exporter.injectErr = injector("counter1", tt.injectedError)
p := push.New(fix.batcher, fix.exporter, time.Second) p := push.New(fix.batcher, fix.exporter, time.Second)
var err error var err error
var lock sync.Mutex var lock sync.Mutex
p.SetErrorHandler(func(sdkErr error) { p.SetErrorHandler(func(sdkErr error) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
err = sdkErr err = sdkErr
}) })
mock := mockClock{clock.NewMock()} mock := mockClock{clock.NewMock()}
p.SetClock(mock) p.SetClock(mock)
p.Start() ctx := context.Background()
runtime.Gosched()
require.Equal(t, 0, fix.exporter.exports) meter := p.Meter("name")
require.Nil(t, err) counter1 := metric.Must(meter).NewInt64Counter("counter1")
counter2 := metric.Must(meter).NewInt64Counter("counter2")
mock.Add(time.Second) p.Start()
runtime.Gosched() runtime.Gosched()
lock.Lock() counter1.Add(ctx, 3, meter.Labels())
_, exports := fix.batcher.getCounts() counter2.Add(ctx, 5, meter.Labels())
require.Equal(t, 1, exports)
require.Error(t, err)
require.Equal(t, fix.exporter.retErr, err)
lock.Unlock()
p.Stop() require.Equal(t, 0, fix.exporter.exports)
require.Nil(t, err)
mock.Add(time.Second)
runtime.Gosched()
records, exports := fix.exporter.resetRecords()
checkpoints, finishes := fix.batcher.getCounts()
require.Equal(t, 1, exports)
require.Equal(t, 1, checkpoints)
require.Equal(t, 1, finishes)
lock.Lock()
if tt.expectedError == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Equal(t, tt.expectedError, err)
}
lock.Unlock()
require.Equal(t, len(tt.expectedDescriptors), len(records))
for _, r := range records {
require.Contains(t, tt.expectedDescriptors, r.Descriptor().Name())
}
p.Stop()
})
}
} }

View File

@ -272,7 +272,7 @@ func (f *testFixture) Process(_ context.Context, record export.Record) error {
f.impl.storeCollect(actual, sum, time.Time{}) f.impl.storeCollect(actual, sum, time.Time{})
case export.MeasureKind: case export.MeasureKind:
lv, ts, err := agg.(aggregator.LastValue).LastValue() lv, ts, err := agg.(aggregator.LastValue).LastValue()
if err != nil && err != aggregator.ErrNoLastValue { if err != nil && err != aggregator.ErrNoData {
f.T.Fatal("Last value error: ", err) f.T.Fatal("Last value error: ", err)
} }
f.impl.storeCollect(actual, lv, ts) f.impl.storeCollect(actual, lv, ts)