1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-03 13:11:53 +02:00

[chore] Use errors.Join to unify errors (#5907)

Resolve #3544

Replace all custom implementations for multi-error unification with the
`errors.Join` function of the standard library.
This commit is contained in:
Tyler Yahn 2024-10-22 15:10:05 -07:00 committed by GitHub
parent 1a964cc449
commit 92ccad7bd9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 86 additions and 132 deletions

View File

@ -5,7 +5,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
"fmt"
"errors"
"os"
"strings"
"sync"
@ -44,25 +44,13 @@ func (c config) readerSignals() (forceFlush, shutdown func(context.Context) erro
// value.
func unify(funcs []func(context.Context) error) func(context.Context) error {
return func(ctx context.Context) error {
var errs []error
var err error
for _, f := range funcs {
if err := f(ctx); err != nil {
errs = append(errs, err)
if e := f(ctx); e != nil {
err = errors.Join(err, e)
}
}
return unifyErrors(errs)
}
}
// unifyErrors combines multiple errors into a single error.
func unifyErrors(errs []error) error {
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
return err
}
}

View File

@ -5,7 +5,7 @@ package metric
import (
"context"
"fmt"
"errors"
"testing"
"github.com/google/go-cmp/cmp"
@ -108,11 +108,19 @@ func TestConfigReaderSignalsForwardedErrors(t *testing.T) {
}
func TestUnifyMultiError(t *testing.T) {
f := func(context.Context) error { return assert.AnError }
funcs := []func(context.Context) error{f, f, f}
errs := []error{assert.AnError, assert.AnError, assert.AnError}
target := fmt.Errorf("%v", errs)
assert.Equal(t, unify(funcs)(context.Background()), target)
var (
e0 = errors.New("0")
e1 = errors.New("1")
e2 = errors.New("2")
)
err := unify([]func(context.Context) error{
func(ctx context.Context) error { return e0 },
func(ctx context.Context) error { return e1 },
func(ctx context.Context) error { return e2 },
})(context.Background())
assert.ErrorIs(t, err, e0)
assert.ErrorIs(t, err, e1)
assert.ErrorIs(t, err, e2)
}
func mergeResource(t *testing.T, r1, r2 *resource.Resource) *resource.Resource {

View File

@ -113,18 +113,17 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
if err != nil {
return err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
externalMetrics, e := producer.Produce(ctx)
if e != nil {
err = errors.Join(err, e)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
global.Debug("ManualReader collection", "Data", rm)
return unifyErrors(errs)
return err
}
// MarshalLog returns logging data about the ManualReader.

View File

@ -442,21 +442,21 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
reg := newObserver()
var errs multierror
var err error
for _, inst := range insts {
switch o := inst.(type) {
case int64Observable:
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
reg.registerInt64(o.observableID)
case float64Observable:
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
@ -467,7 +467,6 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
}
err := errs.errorOrNil()
if reg.len() == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err

View File

@ -251,18 +251,17 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd
if err != nil {
return err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
externalMetrics, e := producer.Produce(ctx)
if e != nil {
err = errors.Join(err, e)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
global.Debug("PeriodicReader collection", "Data", rm)
return unifyErrors(errs)
return err
}
// export exports metric data m using r's exporter.

View File

@ -8,7 +8,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
@ -108,11 +107,11 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
p.Lock()
defer p.Unlock()
var errs multierror
var err error
for _, c := range p.callbacks {
// TODO make the callbacks parallel. ( #3034 )
if err := c(ctx); err != nil {
errs.append(err)
if e := c(ctx); e != nil {
err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
rm.Resource = nil
@ -123,8 +122,8 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(multiCallback)
if err := f(ctx); err != nil {
errs.append(err)
if e := f(ctx); e != nil {
err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
@ -160,7 +159,7 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
rm.ScopeMetrics = rm.ScopeMetrics[:i]
return errs.errorOrNil()
return err
}
// inserter facilitates inserting of new instruments from a single scope into a
@ -222,7 +221,7 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures []aggregate.Measure[N]
)
errs := &multierror{wrapped: errCreatingAggregators}
var err error
seen := make(map[uint64]struct{})
for _, v := range i.pipeline.views {
stream, match := v(inst)
@ -230,9 +229,9 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
continue
}
matched = true
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if e != nil {
err = errors.Join(err, e)
}
if in == nil { // Drop aggregation.
continue
@ -245,8 +244,12 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures = append(measures, in)
}
if err != nil {
err = errors.Join(errCreatingAggregators, err)
}
if matched {
return measures, errs.errorOrNil()
return measures, err
}
// Apply implicit default view if no explicit matched.
@ -255,15 +258,18 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if e != nil {
if err == nil {
err = errCreatingAggregators
}
err = errors.Join(err, e)
}
if in != nil {
// Ensured to have not seen given matched was false.
measures = append(measures, in)
}
return measures, errs.errorOrNil()
return measures, err
}
// addCallback registers a single instrument callback to be run when
@ -609,15 +615,15 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]
errs := &multierror{}
var err error
for _, i := range r.inserters {
in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if err != nil {
errs.append(err)
in, e := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if e != nil {
err = errors.Join(err, e)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
return measures, err
}
// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
@ -626,37 +632,18 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]
errs := &multierror{}
var err error
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
histAgg.Boundaries = boundaries
agg = histAgg
}
in, err := i.Instrument(id, agg)
if err != nil {
errs.append(err)
in, e := i.Instrument(id, agg)
if e != nil {
err = errors.Join(err, e)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
}
type multierror struct {
wrapped error
errors []string
}
func (m *multierror) errorOrNil() error {
if len(m.errors) == 0 {
return nil
}
if m.wrapped == nil {
return errors.New(strings.Join(m.errors, "; "))
}
return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
}
func (m *multierror) append(err error) {
m.errors = append(m.errors, err.Error())
return measures, err
}

View File

@ -117,7 +117,7 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.Equal(assert.AnError, err)
ts.ErrorIs(err, assert.AnError)
ts.Equal(testResourceMetricsAB, m)
}

View File

@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"
"strings"
)
// ErrPartialResource is returned by a detector when complete source
@ -57,62 +56,37 @@ func Detect(ctx context.Context, detectors ...Detector) (*Resource, error) {
// these errors will be returned. Otherwise, nil is returned.
func detect(ctx context.Context, res *Resource, detectors []Detector) error {
var (
r *Resource
errs detectErrs
err error
r *Resource
err error
e error
)
for _, detector := range detectors {
if detector == nil {
continue
}
r, err = detector.Detect(ctx)
if err != nil {
errs = append(errs, err)
if !errors.Is(err, ErrPartialResource) {
r, e = detector.Detect(ctx)
if e != nil {
err = errors.Join(err, e)
if !errors.Is(e, ErrPartialResource) {
continue
}
}
r, err = Merge(res, r)
if err != nil {
errs = append(errs, err)
r, e = Merge(res, r)
if e != nil {
err = errors.Join(err, e)
}
*res = *r
}
if len(errs) == 0 {
return nil
if err != nil {
if errors.Is(err, ErrSchemaURLConflict) {
// If there has been a merge conflict, ensure the resource has no
// schema URL.
res.schemaURL = ""
}
err = fmt.Errorf("error detecting resource: %w", err)
}
if errors.Is(errs, ErrSchemaURLConflict) {
// If there has been a merge conflict, ensure the resource has no
// schema URL.
res.schemaURL = ""
}
return errs
}
type detectErrs []error
func (e detectErrs) Error() string {
errStr := make([]string, len(e))
for i, err := range e {
errStr[i] = fmt.Sprintf("* %s", err)
}
format := "%d errors occurred detecting resource:\n\t%s"
return fmt.Sprintf(format, len(e), strings.Join(errStr, "\n\t"))
}
func (e detectErrs) Unwrap() error {
switch len(e) {
case 0:
return nil
case 1:
return e[0]
}
return e[1:]
}
func (e detectErrs) Is(target error) bool {
return len(e) != 0 && errors.Is(e[0], target)
return err
}