1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-04 09:43:23 +02:00

Update metric SDK to use oterror.Handler

This commit is contained in:
Tyler Yahn 2020-06-02 11:30:09 -07:00
parent cff4ddf888
commit 0b928ed0a1
No known key found for this signature in database
GPG Key ID: 42AA23B0BC85B798
9 changed files with 78 additions and 241 deletions

View File

@ -14,16 +14,12 @@
package metric
import "go.opentelemetry.io/otel/sdk/resource"
import (
"go.opentelemetry.io/otel/sdk/resource"
)
// Config contains configuration for an SDK.
type Config struct {
// ErrorHandler is the function called when the SDK encounters an error.
//
// This option can be overridden after instantiation of the SDK
// with the `SetErrorHandler` method.
ErrorHandler ErrorHandler
// Resource describes all the metric records processed by the
// Accumulator.
Resource *resource.Resource
@ -35,17 +31,6 @@ type Option interface {
Apply(*Config)
}
// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn ErrorHandler) Option {
return errorHandlerOption(fn)
}
type errorHandlerOption ErrorHandler
func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = ErrorHandler(o)
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(res *resource.Resource) Option {
return resourceOption{res}

View File

@ -1,45 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestWithErrorHandler(t *testing.T) {
errH, reg := func() (ErrorHandler, *error) {
e := fmt.Errorf("default invalid")
reg := &e
return func(err error) {
*reg = err
}, reg
}()
c := &Config{}
WithErrorHandler(errH).Apply(c)
err1 := fmt.Errorf("error 1")
c.ErrorHandler(err1)
assert.EqualError(t, *reg, err1.Error())
// Ensure overwriting works.
c = &Config{ErrorHandler: DefaultErrorHandler}
WithErrorHandler(errH).Apply(c)
err2 := fmt.Errorf("error 2")
c.ErrorHandler(err2)
assert.EqualError(t, *reg, err2.Error())
}

View File

@ -17,17 +17,11 @@ package pull
import (
"time"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
// Config contains configuration for a push Controller.
type Config struct {
// ErrorHandler is the function called when the Controller encounters an error.
//
// This option can be overridden after instantiation of the Controller
// with the `SetErrorHandler` method.
ErrorHandler sdk.ErrorHandler
// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
@ -52,17 +46,6 @@ type Option interface {
Apply(*Config)
}
// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn sdk.ErrorHandler) Option {
return errorHandlerOption(fn)
}
type errorHandlerOption sdk.ErrorHandler
func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = sdk.ErrorHandler(o)
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}

View File

@ -47,9 +47,8 @@ type Controller struct {
// New returns a *Controller configured with an aggregation selector and options.
func New(selector export.AggregationSelector, options ...Option) *Controller {
config := &Config{
Resource: resource.Empty(),
ErrorHandler: sdk.DefaultErrorHandler,
CachePeriod: DefaultCachePeriod,
Resource: resource.Empty(),
CachePeriod: DefaultCachePeriod,
}
for _, opt := range options {
opt.Apply(config)
@ -58,7 +57,6 @@ func New(selector export.AggregationSelector, options ...Option) *Controller {
accum := sdk.NewAccumulator(
integrator,
sdk.WithResource(config.Resource),
sdk.WithErrorHandler(config.ErrorHandler),
)
return &Controller{
accumulator: accum,

View File

@ -17,18 +17,11 @@ package push
import (
"time"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
// Config contains configuration for a push Controller.
type Config struct {
// ErrorHandler is the function called when the Controller encounters an error.
//
// This option can be overridden after instantiation of the Controller
// with the `SetErrorHandler` method.
ErrorHandler sdk.ErrorHandler
// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
Resource *resource.Resource
@ -53,17 +46,6 @@ type Option interface {
Apply(*Config)
}
// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn sdk.ErrorHandler) Option {
return errorHandlerOption(fn)
}
type errorHandlerOption sdk.ErrorHandler
func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = sdk.ErrorHandler(o)
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}

View File

@ -15,40 +15,15 @@
package push
import (
"fmt"
"testing"
"go.opentelemetry.io/otel/api/kv"
"github.com/stretchr/testify/assert"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
func TestWithErrorHandler(t *testing.T) {
errH, reg := func() (sdk.ErrorHandler, *error) {
e := fmt.Errorf("default invalid")
reg := &e
return func(err error) {
*reg = err
}, reg
}()
c := &Config{}
WithErrorHandler(errH).Apply(c)
err1 := fmt.Errorf("error 1")
c.ErrorHandler(err1)
assert.EqualError(t, *reg, err1.Error())
// Ensure overwriting works.
c = &Config{ErrorHandler: sdk.DefaultErrorHandler}
WithErrorHandler(errH).Apply(c)
err2 := fmt.Errorf("error 2")
c.ErrorHandler(err2)
assert.EqualError(t, *reg, err2.Error())
}
func TestWithResource(t *testing.T) {
r := resource.New(kv.String("A", "a"))

View File

@ -19,6 +19,7 @@ import (
"sync"
"time"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
export "go.opentelemetry.io/otel/sdk/export/metric"
@ -32,18 +33,17 @@ const DefaultPushPeriod = 10 * time.Second
// Controller organizes a periodic push of metric data.
type Controller struct {
lock sync.Mutex
accumulator *sdk.Accumulator
provider *registry.Provider
errorHandler sdk.ErrorHandler
integrator *simple.Integrator
exporter export.Exporter
wg sync.WaitGroup
ch chan struct{}
period time.Duration
timeout time.Duration
clock controllerTime.Clock
ticker controllerTime.Ticker
lock sync.Mutex
accumulator *sdk.Accumulator
provider *registry.Provider
integrator *simple.Integrator
exporter export.Exporter
wg sync.WaitGroup
ch chan struct{}
period time.Duration
timeout time.Duration
clock controllerTime.Clock
ticker controllerTime.Ticker
}
// New constructs a Controller, an implementation of metric.Provider,
@ -51,8 +51,7 @@ type Controller struct {
// periodic collection.
func New(selector export.AggregationSelector, exporter export.Exporter, opts ...Option) *Controller {
c := &Config{
ErrorHandler: sdk.DefaultErrorHandler,
Period: DefaultPushPeriod,
Period: DefaultPushPeriod,
}
for _, opt := range opts {
opt.Apply(c)
@ -64,19 +63,17 @@ func New(selector export.AggregationSelector, exporter export.Exporter, opts ...
integrator := simple.New(selector, c.Stateful)
impl := sdk.NewAccumulator(
integrator,
sdk.WithErrorHandler(c.ErrorHandler),
sdk.WithResource(c.Resource),
)
return &Controller{
provider: registry.NewProvider(impl),
accumulator: impl,
integrator: integrator,
exporter: exporter,
errorHandler: c.ErrorHandler,
ch: make(chan struct{}),
period: c.Period,
timeout: c.Timeout,
clock: controllerTime.RealClock{},
provider: registry.NewProvider(impl),
accumulator: impl,
integrator: integrator,
exporter: exporter,
ch: make(chan struct{}),
period: c.Period,
timeout: c.Timeout,
clock: controllerTime.RealClock{},
}
}
@ -88,15 +85,6 @@ func (c *Controller) SetClock(clock controllerTime.Clock) {
c.clock = clock
}
// SetErrorHandler sets the handler for errors. If none has been set, the
// SDK default error handler is used.
func (c *Controller) SetErrorHandler(errorHandler sdk.ErrorHandler) {
c.lock.Lock()
defer c.lock.Unlock()
c.errorHandler = errorHandler
c.accumulator.SetErrorHandler(errorHandler)
}
// Provider returns a metric.Provider instance for this controller.
func (c *Controller) Provider() metric.Provider {
return c.provider
@ -160,6 +148,6 @@ func (c *Controller) tick() {
c.integrator.FinishedCollection()
if err != nil {
c.errorHandler(err)
global.Handle(err)
}
}

View File

@ -19,12 +19,12 @@ import (
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
@ -40,42 +40,50 @@ import (
var Must = metric.Must
var testResource = resource.New(kv.String("R", "V"))
type handler struct{ err error }
func (h *handler) Handle(err error) {
h.err = err
}
func (h *handler) Reset() {
h.err = nil
}
func (h *handler) Flush() error {
err := h.err
h.Reset()
return err
}
var testHandler *handler
func init() {
testHandler = new(handler)
global.SetHandler(testHandler)
}
type correctnessIntegrator struct {
newAggCount int64
t *testing.T
records []export.Record
sync.Mutex
err error
}
func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) {
testHandler.Reset()
integrator := &correctnessIntegrator{
t: t,
}
accum := metricsdk.NewAccumulator(
integrator,
metricsdk.WithResource(testResource),
metricsdk.WithErrorHandler(func(err error) {
integrator.Lock()
defer integrator.Unlock()
integrator.err = err
}),
)
meter := metric.WrapMeterImpl(accum, "test")
return meter, accum, integrator
}
func (ci *correctnessIntegrator) sdkErr() error {
ci.Lock()
defer ci.Unlock()
t := ci.err
ci.err = nil
return t
}
func (ci *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
name := descriptor.Name()
@ -110,16 +118,10 @@ func TestInputRangeCounter(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
sdkErr = handleErr
})
counter := Must(meter).NewInt64Counter("name.counter")
counter.Add(ctx, -1)
require.Equal(t, aggregator.ErrNegativeInput, sdkErr)
sdkErr = nil
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed)
@ -131,18 +133,13 @@ func TestInputRangeCounter(t *testing.T) {
require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
require.Nil(t, sdkErr)
require.Nil(t, testHandler.Flush())
}
func TestInputRangeUpDownCounter(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
sdkErr = handleErr
})
counter := Must(meter).NewInt64UpDownCounter("name.updowncounter")
counter.Add(ctx, -1)
@ -155,23 +152,17 @@ func TestInputRangeUpDownCounter(t *testing.T) {
require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
require.Nil(t, sdkErr)
require.Nil(t, testHandler.Flush())
}
func TestInputRangeValueRecorder(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
sdkErr = handleErr
})
valuerecorder := Must(meter).NewFloat64ValueRecorder("name.valuerecorder")
valuerecorder.Record(ctx, math.NaN())
require.Equal(t, aggregator.ErrNaNInput, sdkErr)
sdkErr = nil
require.Equal(t, aggregator.ErrNaNInput, testHandler.Flush())
checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed)
@ -185,7 +176,7 @@ func TestInputRangeValueRecorder(t *testing.T) {
count, err := integrator.records[0].Aggregator().(aggregator.Distribution).Count()
require.Equal(t, int64(2), count)
require.Equal(t, 1, checkpointed)
require.Nil(t, sdkErr)
require.Nil(t, testHandler.Flush())
require.Nil(t, err)
}
@ -204,17 +195,13 @@ func TestDisabledInstrument(t *testing.T) {
func TestRecordNaN(t *testing.T) {
ctx := context.Background()
meter, sdk, _ := newSDK(t)
meter, _, _ := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
sdkErr = handleErr
})
c := Must(meter).NewFloat64Counter("sum.name")
require.Nil(t, sdkErr)
require.Nil(t, testHandler.Flush())
c.Add(ctx, math.NaN())
require.Error(t, sdkErr)
require.Error(t, testHandler.Flush())
}
func TestSDKLabelsDeduplication(t *testing.T) {
@ -395,15 +382,15 @@ func TestSumObserverInputRange(t *testing.T) {
_ = Must(meter).NewFloat64SumObserver("float.sumobserver", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(-2, kv.String("A", "B"))
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
result.Observe(-1, kv.String("C", "D"))
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
})
_ = Must(meter).NewInt64SumObserver("int.sumobserver", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(-1, kv.String("A", "B"))
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
result.Observe(-1)
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
})
collected := sdk.Collect(ctx)
@ -412,7 +399,7 @@ func TestSumObserverInputRange(t *testing.T) {
require.Equal(t, 0, len(integrator.records))
// check that the error condition was reset
require.NoError(t, integrator.sdkErr())
require.NoError(t, testHandler.Flush())
}
func TestObserverBatch(t *testing.T) {
@ -553,7 +540,7 @@ func TestIncorrectInstruments(t *testing.T) {
var observer metric.Int64ValueObserver
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
meter, sdk, _ := newSDK(t)
// Now try with uninitialized instruments.
meter.RecordBatch(ctx, nil, counter.Measurement(1))
@ -562,7 +549,7 @@ func TestIncorrectInstruments(t *testing.T) {
})
collected := sdk.Collect(ctx)
require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr())
require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush())
require.Equal(t, 0, collected)
// Now try with instruments from another SDK.
@ -579,7 +566,7 @@ func TestIncorrectInstruments(t *testing.T) {
collected = sdk.Collect(ctx)
require.Equal(t, 0, collected)
require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr())
require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush())
}
func TestSyncInAsync(t *testing.T) {

View File

@ -17,11 +17,11 @@ package metric
import (
"context"
"fmt"
"os"
"runtime"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
@ -62,9 +62,6 @@ type (
// collectLock prevents simultaneous calls to Collect().
collectLock sync.Mutex
// errorHandler supports delivering errors to the user.
errorHandler ErrorHandler
// asyncSortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation. It is cleared after use.
@ -143,8 +140,6 @@ type (
labels *label.Set
recorder export.Aggregator
}
ErrorHandler func(error)
)
var (
@ -170,7 +165,7 @@ func (s *syncInstrument) Implementation() interface{} {
func (a *asyncInstrument) observe(number api.Number, labels *label.Set) {
if err := aggregator.RangeTest(number, &a.descriptor); err != nil {
a.meter.errorHandler(err)
global.Handle(err)
return
}
recorder := a.getRecorder(labels)
@ -180,7 +175,7 @@ func (a *asyncInstrument) observe(number api.Number, labels *label.Set) {
return
}
if err := recorder.Update(context.Background(), number, &a.descriptor); err != nil {
a.meter.errorHandler(err)
global.Handle(err)
return
}
}
@ -213,10 +208,6 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator {
return rec
}
func (m *Accumulator) SetErrorHandler(f ErrorHandler) {
m.errorHandler = f
}
// acquireHandle gets or creates a `*record` corresponding to `kvs`,
// the input labels. The second argument `labels` is passed in to
// support re-use of the orderedLabels computed by a previous
@ -314,25 +305,18 @@ func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs [
// current metric values. A push-based integrator should configure its
// own periodic collection.
func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator {
c := &Config{ErrorHandler: DefaultErrorHandler}
c := &Config{}
for _, opt := range opts {
opt.Apply(c)
}
return &Accumulator{
integrator: integrator,
errorHandler: c.ErrorHandler,
asyncInstruments: internal.NewAsyncInstrumentState(c.ErrorHandler),
asyncInstruments: internal.NewAsyncInstrumentState(),
resource: c.Resource,
}
}
// DefaultErrorHandler is used when the user does not configure an
// error handler. Prints messages to os.Stderr.
func DefaultErrorHandler(err error) {
fmt.Fprintln(os.Stderr, "Metrics Accumulator error:", err)
}
// NewSyncInstrument implements api.MetricImpl.
func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) {
return &syncInstrument{
@ -485,7 +469,7 @@ func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descrip
exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder)
err := m.integrator.Process(ctx, exportRecord)
if err != nil {
m.errorHandler(err)
global.Handle(err)
}
return 1
}
@ -521,11 +505,11 @@ func (r *record) RecordOne(ctx context.Context, number api.Number) {
return
}
if err := aggregator.RangeTest(number, &r.inst.descriptor); err != nil {
r.inst.meter.errorHandler(err)
global.Handle(err)
return
}
if err := r.recorder.Update(ctx, number, &r.inst.descriptor); err != nil {
r.inst.meter.errorHandler(err)
global.Handle(err)
return
}
// Record was modified, inform the Collect() that things need
@ -553,7 +537,7 @@ func (m *Accumulator) fromSync(sync metric.SyncImpl) *syncInstrument {
return inst
}
}
m.errorHandler(ErrUninitializedInstrument)
global.Handle(ErrUninitializedInstrument)
return nil
}
@ -565,6 +549,6 @@ func (m *Accumulator) fromAsync(async metric.AsyncImpl) *asyncInstrument {
return inst
}
}
m.errorHandler(ErrUninitializedInstrument)
global.Handle(ErrUninitializedInstrument)
return nil
}