1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-03-03 14:52:56 +02:00

Merge pull request #791 from MrAlias/err-handle

Update Error Handling
This commit is contained in:
Tyler Yahn 2020-06-02 13:44:20 -07:00 committed by GitHub
commit eb14b395bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 140 additions and 311 deletions

View File

@ -109,6 +109,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -50,8 +50,6 @@ type Exporter struct {
lock sync.RWMutex
controller *pull.Controller
onError func(error)
defaultSummaryQuantiles []float64
defaultHistogramBoundaries []float64
}
@ -85,10 +83,6 @@ type Config struct {
// DefaultHistogramBoundaries defines the default histogram bucket
// boundaries.
DefaultHistogramBoundaries []float64
// OnError is a function that handle errors that may occur while exporting metrics.
// TODO: This should be refactored or even removed once we have a better error handling mechanism.
OnError func(error)
}
// NewExportPipeline sets up a complete export pipeline with the recommended setup,
@ -106,19 +100,12 @@ func NewExportPipeline(config Config, options ...pull.Option) (*Exporter, error)
config.Gatherer = config.Registry
}
if config.OnError == nil {
config.OnError = func(err error) {
fmt.Println(err.Error())
}
}
e := &Exporter{
handler: promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}),
registerer: config.Registerer,
gatherer: config.Gatherer,
defaultSummaryQuantiles: config.DefaultSummaryQuantiles,
defaultHistogramBoundaries: config.DefaultHistogramBoundaries,
onError: config.OnError,
}
c := &collector{
@ -257,7 +244,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
return nil
})
if err != nil {
c.exp.onError(err)
global.Handle(err)
}
}

View File

@ -17,7 +17,6 @@ package jaeger
import (
"context"
"encoding/binary"
"log"
"go.opentelemetry.io/otel/api/kv/value"
@ -37,11 +36,6 @@ type Option func(*options)
// options are the options to be used when initializing a Jaeger export.
type options struct {
// OnError is the hook to be called when there is
// an error occurred when uploading the span data.
// If no custom hook is set, errors are logged.
OnError func(err error)
// Process contains the information about the exporting process.
Process Process
@ -55,15 +49,6 @@ type options struct {
RegisterGlobal bool
}
// WithOnError sets the hook to be called when there is
// an error occurred when uploading the span data.
// If no custom hook is set, errors are logged.
func WithOnError(onError func(err error)) Option {
return func(o *options) {
o.OnError = onError
}
}
// WithProcess sets the process with the information about the exporting process.
func WithProcess(process Process) Option {
return func(o *options) {
@ -106,13 +91,6 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
opt(&o)
}
onError := func(err error) {
if o.OnError != nil {
o.OnError(err)
return
}
log.Printf("Error when uploading spans to Jaeger: %v", err)
}
service := o.Process.ServiceName
if service == "" {
service = defaultServiceName
@ -134,7 +112,7 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
}
bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) {
if err := e.upload(bundle.([]*gen.Span)); err != nil {
onError(err)
global.Handle(err)
}
})

View File

@ -18,9 +18,9 @@ import (
"context"
"errors"
"fmt"
"os"
"sync"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
)
@ -41,11 +41,9 @@ type AsyncCollector interface {
type AsyncInstrumentState struct {
lock sync.Mutex
// errorHandler will be called in case of an invalid
// metric.AsyncRunner, i.e., one that does not implement
// either the single-or batch-runner interfaces.
errorHandler func(error)
errorOnce sync.Once
// errorOnce will use the global.Handler to report an error
// once in case of an invalid runner attempting to run.
errorOnce sync.Once
// runnerMap keeps the set of runners that will run each
// collection interval. Singletons are entered with a real
@ -79,20 +77,9 @@ type asyncRunnerPair struct {
// NewAsyncInstrumentState returns a new *AsyncInstrumentState, for
// use by MeterImpl to manage running the set of observer callbacks in
// the correct order.
//
// errorHandler is used to print an error condition. If errorHandler
// nil, the default error handler will be used that prints to
// os.Stderr. Only the first error is passed to the handler, after
// which errors are skipped.
func NewAsyncInstrumentState(errorHandler func(error)) *AsyncInstrumentState {
if errorHandler == nil {
errorHandler = func(err error) {
fmt.Fprintln(os.Stderr, "Metrics Async state error:", err)
}
}
func NewAsyncInstrumentState() *AsyncInstrumentState {
return &AsyncInstrumentState{
errorHandler: errorHandler,
runnerMap: map[asyncRunnerPair]struct{}{},
runnerMap: map[asyncRunnerPair]struct{}{},
}
}
@ -155,7 +142,7 @@ func (a *AsyncInstrumentState) Run(ctx context.Context, collector AsyncCollector
}
a.errorOnce.Do(func() {
a.errorHandler(fmt.Errorf("%w: type %T (reported once)", ErrInvalidAsyncRunner, rp))
global.Handle(fmt.Errorf("%w: type %T (reported once)", ErrInvalidAsyncRunner, rp))
})
}
}

View File

@ -114,7 +114,7 @@ func (m *MeterImpl) doRecordSingle(ctx context.Context, labels []kv.KeyValue, in
func NewProvider() (*MeterImpl, apimetric.Provider) {
impl := &MeterImpl{
asyncInstruments: NewAsyncInstrumentState(nil),
asyncInstruments: NewAsyncInstrumentState(),
}
return impl, registry.NewProvider(impl)
}

View File

@ -14,16 +14,12 @@
package metric
import "go.opentelemetry.io/otel/sdk/resource"
import (
"go.opentelemetry.io/otel/sdk/resource"
)
// Config contains configuration for an SDK.
type Config struct {
// ErrorHandler is the function called when the SDK encounters an error.
//
// This option can be overridden after instantiation of the SDK
// with the `SetErrorHandler` method.
ErrorHandler ErrorHandler
// Resource describes all the metric records processed by the
// Accumulator.
Resource *resource.Resource
@ -35,17 +31,6 @@ type Option interface {
Apply(*Config)
}
// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn ErrorHandler) Option {
return errorHandlerOption(fn)
}
type errorHandlerOption ErrorHandler
func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = ErrorHandler(o)
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(res *resource.Resource) Option {
return resourceOption{res}

View File

@ -1,45 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestWithErrorHandler(t *testing.T) {
errH, reg := func() (ErrorHandler, *error) {
e := fmt.Errorf("default invalid")
reg := &e
return func(err error) {
*reg = err
}, reg
}()
c := &Config{}
WithErrorHandler(errH).Apply(c)
err1 := fmt.Errorf("error 1")
c.ErrorHandler(err1)
assert.EqualError(t, *reg, err1.Error())
// Ensure overwriting works.
c = &Config{ErrorHandler: DefaultErrorHandler}
WithErrorHandler(errH).Apply(c)
err2 := fmt.Errorf("error 2")
c.ErrorHandler(err2)
assert.EqualError(t, *reg, err2.Error())
}

View File

@ -17,17 +17,11 @@ package pull
import (
"time"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
// Config contains configuration for a push Controller.
type Config struct {
// ErrorHandler is the function called when the Controller encounters an error.
//
// This option can be overridden after instantiation of the Controller
// with the `SetErrorHandler` method.
ErrorHandler sdk.ErrorHandler
// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
@ -52,17 +46,6 @@ type Option interface {
Apply(*Config)
}
// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn sdk.ErrorHandler) Option {
return errorHandlerOption(fn)
}
type errorHandlerOption sdk.ErrorHandler
func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = sdk.ErrorHandler(o)
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}

View File

@ -47,9 +47,8 @@ type Controller struct {
// New returns a *Controller configured with an aggregation selector and options.
func New(selector export.AggregationSelector, options ...Option) *Controller {
config := &Config{
Resource: resource.Empty(),
ErrorHandler: sdk.DefaultErrorHandler,
CachePeriod: DefaultCachePeriod,
Resource: resource.Empty(),
CachePeriod: DefaultCachePeriod,
}
for _, opt := range options {
opt.Apply(config)
@ -58,7 +57,6 @@ func New(selector export.AggregationSelector, options ...Option) *Controller {
accum := sdk.NewAccumulator(
integrator,
sdk.WithResource(config.Resource),
sdk.WithErrorHandler(config.ErrorHandler),
)
return &Controller{
accumulator: accum,

View File

@ -17,18 +17,11 @@ package push
import (
"time"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
// Config contains configuration for a push Controller.
type Config struct {
// ErrorHandler is the function called when the Controller encounters an error.
//
// This option can be overridden after instantiation of the Controller
// with the `SetErrorHandler` method.
ErrorHandler sdk.ErrorHandler
// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
Resource *resource.Resource
@ -53,17 +46,6 @@ type Option interface {
Apply(*Config)
}
// WithErrorHandler sets the ErrorHandler configuration option of a Config.
func WithErrorHandler(fn sdk.ErrorHandler) Option {
return errorHandlerOption(fn)
}
type errorHandlerOption sdk.ErrorHandler
func (o errorHandlerOption) Apply(config *Config) {
config.ErrorHandler = sdk.ErrorHandler(o)
}
// WithResource sets the Resource configuration option of a Config.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}

View File

@ -15,40 +15,15 @@
package push
import (
"fmt"
"testing"
"go.opentelemetry.io/otel/api/kv"
"github.com/stretchr/testify/assert"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
func TestWithErrorHandler(t *testing.T) {
errH, reg := func() (sdk.ErrorHandler, *error) {
e := fmt.Errorf("default invalid")
reg := &e
return func(err error) {
*reg = err
}, reg
}()
c := &Config{}
WithErrorHandler(errH).Apply(c)
err1 := fmt.Errorf("error 1")
c.ErrorHandler(err1)
assert.EqualError(t, *reg, err1.Error())
// Ensure overwriting works.
c = &Config{ErrorHandler: sdk.DefaultErrorHandler}
WithErrorHandler(errH).Apply(c)
err2 := fmt.Errorf("error 2")
c.ErrorHandler(err2)
assert.EqualError(t, *reg, err2.Error())
}
func TestWithResource(t *testing.T) {
r := resource.New(kv.String("A", "a"))

View File

@ -19,6 +19,7 @@ import (
"sync"
"time"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
export "go.opentelemetry.io/otel/sdk/export/metric"
@ -32,18 +33,17 @@ const DefaultPushPeriod = 10 * time.Second
// Controller organizes a periodic push of metric data.
type Controller struct {
lock sync.Mutex
accumulator *sdk.Accumulator
provider *registry.Provider
errorHandler sdk.ErrorHandler
integrator *simple.Integrator
exporter export.Exporter
wg sync.WaitGroup
ch chan struct{}
period time.Duration
timeout time.Duration
clock controllerTime.Clock
ticker controllerTime.Ticker
lock sync.Mutex
accumulator *sdk.Accumulator
provider *registry.Provider
integrator *simple.Integrator
exporter export.Exporter
wg sync.WaitGroup
ch chan struct{}
period time.Duration
timeout time.Duration
clock controllerTime.Clock
ticker controllerTime.Ticker
}
// New constructs a Controller, an implementation of metric.Provider,
@ -51,8 +51,7 @@ type Controller struct {
// periodic collection.
func New(selector export.AggregationSelector, exporter export.Exporter, opts ...Option) *Controller {
c := &Config{
ErrorHandler: sdk.DefaultErrorHandler,
Period: DefaultPushPeriod,
Period: DefaultPushPeriod,
}
for _, opt := range opts {
opt.Apply(c)
@ -64,19 +63,17 @@ func New(selector export.AggregationSelector, exporter export.Exporter, opts ...
integrator := simple.New(selector, c.Stateful)
impl := sdk.NewAccumulator(
integrator,
sdk.WithErrorHandler(c.ErrorHandler),
sdk.WithResource(c.Resource),
)
return &Controller{
provider: registry.NewProvider(impl),
accumulator: impl,
integrator: integrator,
exporter: exporter,
errorHandler: c.ErrorHandler,
ch: make(chan struct{}),
period: c.Period,
timeout: c.Timeout,
clock: controllerTime.RealClock{},
provider: registry.NewProvider(impl),
accumulator: impl,
integrator: integrator,
exporter: exporter,
ch: make(chan struct{}),
period: c.Period,
timeout: c.Timeout,
clock: controllerTime.RealClock{},
}
}
@ -88,15 +85,6 @@ func (c *Controller) SetClock(clock controllerTime.Clock) {
c.clock = clock
}
// SetErrorHandler sets the handler for errors. If none has been set, the
// SDK default error handler is used.
func (c *Controller) SetErrorHandler(errorHandler sdk.ErrorHandler) {
c.lock.Lock()
defer c.lock.Unlock()
c.errorHandler = errorHandler
c.accumulator.SetErrorHandler(errorHandler)
}
// Provider returns a metric.Provider instance for this controller.
func (c *Controller) Provider() metric.Provider {
return c.provider
@ -160,6 +148,6 @@ func (c *Controller) tick() {
c.integrator.FinishedCollection()
if err != nil {
c.errorHandler(err)
global.Handle(err)
}
}

View File

@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
@ -38,6 +39,32 @@ import (
var testResource = resource.New(kv.String("R", "V"))
type handler struct {
sync.Mutex
err error
}
func (h *handler) Handle(err error) {
h.Lock()
h.err = err
h.Unlock()
}
func (h *handler) Flush() error {
h.Lock()
err := h.err
h.err = nil
h.Unlock()
return err
}
var testHandler *handler
func init() {
testHandler = new(handler)
global.SetHandler(testHandler)
}
type testExporter struct {
t *testing.T
lock sync.Mutex
@ -204,14 +231,6 @@ func TestPushExportError(t *testing.T) {
push.WithResource(testResource),
)
var err error
var lock sync.Mutex
p.SetErrorHandler(func(sdkErr error) {
lock.Lock()
defer lock.Unlock()
err = sdkErr
})
mock := controllerTest.NewMockClock()
p.SetClock(mock)
@ -228,21 +247,20 @@ func TestPushExportError(t *testing.T) {
counter2.Add(ctx, 5)
require.Equal(t, 0, fix.exporter.exports)
require.Nil(t, err)
require.Nil(t, testHandler.Flush())
mock.Add(time.Second)
runtime.Gosched()
records, exports := fix.exporter.resetRecords()
require.Equal(t, 1, exports)
lock.Lock()
if tt.expectedError == nil {
require.NoError(t, err)
require.NoError(t, testHandler.Flush())
} else {
err := testHandler.Flush()
require.Error(t, err)
require.Equal(t, tt.expectedError, err)
}
lock.Unlock()
require.Equal(t, len(tt.expectedDescriptors), len(records))
for _, r := range records {
require.Contains(t, tt.expectedDescriptors,

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
@ -40,42 +41,59 @@ import (
var Must = metric.Must
var testResource = resource.New(kv.String("R", "V"))
type handler struct {
sync.Mutex
err error
}
func (h *handler) Handle(err error) {
h.Lock()
h.err = err
h.Unlock()
}
func (h *handler) Reset() {
h.Lock()
h.err = nil
h.Unlock()
}
func (h *handler) Flush() error {
h.Lock()
err := h.err
h.err = nil
h.Unlock()
return err
}
var testHandler *handler
func init() {
testHandler = new(handler)
global.SetHandler(testHandler)
}
type correctnessIntegrator struct {
newAggCount int64
t *testing.T
records []export.Record
sync.Mutex
err error
}
func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) {
testHandler.Reset()
integrator := &correctnessIntegrator{
t: t,
}
accum := metricsdk.NewAccumulator(
integrator,
metricsdk.WithResource(testResource),
metricsdk.WithErrorHandler(func(err error) {
integrator.Lock()
defer integrator.Unlock()
integrator.err = err
}),
)
meter := metric.WrapMeterImpl(accum, "test")
return meter, accum, integrator
}
func (ci *correctnessIntegrator) sdkErr() error {
ci.Lock()
defer ci.Unlock()
t := ci.err
ci.err = nil
return t
}
func (ci *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
name := descriptor.Name()
@ -110,16 +128,10 @@ func TestInputRangeCounter(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
sdkErr = handleErr
})
counter := Must(meter).NewInt64Counter("name.counter")
counter.Add(ctx, -1)
require.Equal(t, aggregator.ErrNegativeInput, sdkErr)
sdkErr = nil
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed)
@ -131,18 +143,13 @@ func TestInputRangeCounter(t *testing.T) {
require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
require.Nil(t, sdkErr)
require.Nil(t, testHandler.Flush())
}
func TestInputRangeUpDownCounter(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
sdkErr = handleErr
})
counter := Must(meter).NewInt64UpDownCounter("name.updowncounter")
counter.Add(ctx, -1)
@ -155,23 +162,17 @@ func TestInputRangeUpDownCounter(t *testing.T) {
require.Equal(t, int64(1), sum.AsInt64())
require.Equal(t, 1, checkpointed)
require.Nil(t, err)
require.Nil(t, sdkErr)
require.Nil(t, testHandler.Flush())
}
func TestInputRangeValueRecorder(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
sdkErr = handleErr
})
valuerecorder := Must(meter).NewFloat64ValueRecorder("name.valuerecorder")
valuerecorder.Record(ctx, math.NaN())
require.Equal(t, aggregator.ErrNaNInput, sdkErr)
sdkErr = nil
require.Equal(t, aggregator.ErrNaNInput, testHandler.Flush())
checkpointed := sdk.Collect(ctx)
require.Equal(t, 0, checkpointed)
@ -185,7 +186,7 @@ func TestInputRangeValueRecorder(t *testing.T) {
count, err := integrator.records[0].Aggregator().(aggregator.Distribution).Count()
require.Equal(t, int64(2), count)
require.Equal(t, 1, checkpointed)
require.Nil(t, sdkErr)
require.Nil(t, testHandler.Flush())
require.Nil(t, err)
}
@ -204,17 +205,13 @@ func TestDisabledInstrument(t *testing.T) {
func TestRecordNaN(t *testing.T) {
ctx := context.Background()
meter, sdk, _ := newSDK(t)
meter, _, _ := newSDK(t)
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
sdkErr = handleErr
})
c := Must(meter).NewFloat64Counter("sum.name")
require.Nil(t, sdkErr)
require.Nil(t, testHandler.Flush())
c.Add(ctx, math.NaN())
require.Error(t, sdkErr)
require.Error(t, testHandler.Flush())
}
func TestSDKLabelsDeduplication(t *testing.T) {
@ -395,15 +392,15 @@ func TestSumObserverInputRange(t *testing.T) {
_ = Must(meter).NewFloat64SumObserver("float.sumobserver", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(-2, kv.String("A", "B"))
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
result.Observe(-1, kv.String("C", "D"))
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
})
_ = Must(meter).NewInt64SumObserver("int.sumobserver", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(-1, kv.String("A", "B"))
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
result.Observe(-1)
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
require.Equal(t, aggregator.ErrNegativeInput, testHandler.Flush())
})
collected := sdk.Collect(ctx)
@ -412,7 +409,7 @@ func TestSumObserverInputRange(t *testing.T) {
require.Equal(t, 0, len(integrator.records))
// check that the error condition was reset
require.NoError(t, integrator.sdkErr())
require.NoError(t, testHandler.Flush())
}
func TestObserverBatch(t *testing.T) {
@ -553,7 +550,7 @@ func TestIncorrectInstruments(t *testing.T) {
var observer metric.Int64ValueObserver
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
meter, sdk, _ := newSDK(t)
// Now try with uninitialized instruments.
meter.RecordBatch(ctx, nil, counter.Measurement(1))
@ -562,7 +559,7 @@ func TestIncorrectInstruments(t *testing.T) {
})
collected := sdk.Collect(ctx)
require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr())
require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush())
require.Equal(t, 0, collected)
// Now try with instruments from another SDK.
@ -579,7 +576,7 @@ func TestIncorrectInstruments(t *testing.T) {
collected = sdk.Collect(ctx)
require.Equal(t, 0, collected)
require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr())
require.Equal(t, metricsdk.ErrUninitializedInstrument, testHandler.Flush())
}
func TestSyncInAsync(t *testing.T) {

View File

@ -17,11 +17,11 @@ package metric
import (
"context"
"fmt"
"os"
"runtime"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
@ -62,9 +62,6 @@ type (
// collectLock prevents simultaneous calls to Collect().
collectLock sync.Mutex
// errorHandler supports delivering errors to the user.
errorHandler ErrorHandler
// asyncSortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation. It is cleared after use.
@ -143,8 +140,6 @@ type (
labels *label.Set
recorder export.Aggregator
}
ErrorHandler func(error)
)
var (
@ -170,7 +165,7 @@ func (s *syncInstrument) Implementation() interface{} {
func (a *asyncInstrument) observe(number api.Number, labels *label.Set) {
if err := aggregator.RangeTest(number, &a.descriptor); err != nil {
a.meter.errorHandler(err)
global.Handle(err)
return
}
recorder := a.getRecorder(labels)
@ -180,7 +175,7 @@ func (a *asyncInstrument) observe(number api.Number, labels *label.Set) {
return
}
if err := recorder.Update(context.Background(), number, &a.descriptor); err != nil {
a.meter.errorHandler(err)
global.Handle(err)
return
}
}
@ -213,10 +208,6 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator {
return rec
}
func (m *Accumulator) SetErrorHandler(f ErrorHandler) {
m.errorHandler = f
}
// acquireHandle gets or creates a `*record` corresponding to `kvs`,
// the input labels. The second argument `labels` is passed in to
// support re-use of the orderedLabels computed by a previous
@ -314,25 +305,18 @@ func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs [
// current metric values. A push-based integrator should configure its
// own periodic collection.
func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator {
c := &Config{ErrorHandler: DefaultErrorHandler}
c := &Config{}
for _, opt := range opts {
opt.Apply(c)
}
return &Accumulator{
integrator: integrator,
errorHandler: c.ErrorHandler,
asyncInstruments: internal.NewAsyncInstrumentState(c.ErrorHandler),
asyncInstruments: internal.NewAsyncInstrumentState(),
resource: c.Resource,
}
}
// DefaultErrorHandler is used when the user does not configure an
// error handler. Prints messages to os.Stderr.
func DefaultErrorHandler(err error) {
fmt.Fprintln(os.Stderr, "Metrics Accumulator error:", err)
}
// NewSyncInstrument implements api.MetricImpl.
func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) {
return &syncInstrument{
@ -485,7 +469,7 @@ func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descrip
exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder)
err := m.integrator.Process(ctx, exportRecord)
if err != nil {
m.errorHandler(err)
global.Handle(err)
}
return 1
}
@ -521,11 +505,11 @@ func (r *record) RecordOne(ctx context.Context, number api.Number) {
return
}
if err := aggregator.RangeTest(number, &r.inst.descriptor); err != nil {
r.inst.meter.errorHandler(err)
global.Handle(err)
return
}
if err := r.recorder.Update(ctx, number, &r.inst.descriptor); err != nil {
r.inst.meter.errorHandler(err)
global.Handle(err)
return
}
// Record was modified, inform the Collect() that things need
@ -553,7 +537,7 @@ func (m *Accumulator) fromSync(sync metric.SyncImpl) *syncInstrument {
return inst
}
}
m.errorHandler(ErrUninitializedInstrument)
global.Handle(ErrUninitializedInstrument)
return nil
}
@ -565,6 +549,6 @@ func (m *Accumulator) fromAsync(async metric.AsyncImpl) *asyncInstrument {
return inst
}
}
m.errorHandler(ErrUninitializedInstrument)
global.Handle(ErrUninitializedInstrument)
return nil
}

View File

@ -16,6 +16,7 @@ package trace
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
@ -23,6 +24,7 @@ import (
"google.golang.org/grpc/codes"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
apitrace "go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/trace"
@ -200,12 +202,14 @@ func (s *span) addEventWithTimestamp(timestamp time.Time, name string, attrs ...
})
}
var errUninitializedSpan = errors.New("failed to set name on uninitialized span")
func (s *span) SetName(name string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.data == nil {
// TODO: now what?
global.Handle(errUninitializedSpan)
return
}
s.data.Name = name

View File

@ -24,6 +24,7 @@ import (
"testing"
"time"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv/value"
"github.com/google/go-cmp/cmp"
@ -43,9 +44,15 @@ var (
sid apitrace.SpanID
)
type discardHandler struct{}
func (*discardHandler) Handle(_ error) {}
func init() {
tid, _ = apitrace.IDFromHex("01020304050607080102040810203040")
sid, _ = apitrace.SpanIDFromHex("0102040810203040")
global.SetHandler(new(discardHandler))
}
func TestTracerFollowsExpectedAPIBehaviour(t *testing.T) {