mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-14 10:13:10 +02:00
1fab21ddbf
* Support use of synchronous instruments in async callbacks * Add a test
162 lines
5.2 KiB
Go
162 lines
5.2 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package metric
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
|
|
"go.opentelemetry.io/otel/api/kv"
|
|
"go.opentelemetry.io/otel/api/metric"
|
|
)
|
|
|
|
var ErrInvalidAsyncRunner = errors.New("unknown async runner type")
|
|
|
|
// AsyncCollector is an interface used between the MeterImpl and the
|
|
// AsyncInstrumentState helper below. This interface is implemented by
|
|
// the SDK to provide support for running observer callbacks.
|
|
type AsyncCollector interface {
|
|
// CollectAsync passes a batch of observations to the MeterImpl.
|
|
CollectAsync([]kv.KeyValue, ...metric.Observation)
|
|
}
|
|
|
|
// AsyncInstrumentState manages an ordered set of asynchronous
|
|
// instruments and the distinct runners, taking into account batch
|
|
// observer callbacks.
|
|
type AsyncInstrumentState struct {
|
|
lock sync.Mutex
|
|
|
|
// errorHandler will be called in case of an invalid
|
|
// metric.AsyncRunner, i.e., one that does not implement
|
|
// either the single-or batch-runner interfaces.
|
|
errorHandler func(error)
|
|
errorOnce sync.Once
|
|
|
|
// runnerMap keeps the set of runners that will run each
|
|
// collection interval. Singletons are entered with a real
|
|
// instrument each, batch observers are entered with a nil
|
|
// instrument, ensuring that when a singleton callback is used
|
|
// repeatedly, it is excuted repeatedly in the interval, while
|
|
// when a batch callback is used repeatedly, it only executes
|
|
// once per interval.
|
|
runnerMap map[asyncRunnerPair]struct{}
|
|
|
|
// runners maintains the set of runners in the order they were
|
|
// registered.
|
|
runners []asyncRunnerPair
|
|
|
|
// instruments maintains the set of instruments in the order
|
|
// they were registered.
|
|
instruments []metric.AsyncImpl
|
|
}
|
|
|
|
// asyncRunnerPair is a map entry for Observer callback runners.
|
|
type asyncRunnerPair struct {
|
|
// runner is used as a map key here. The API ensures
|
|
// that all callbacks are pointers for this reason.
|
|
runner metric.AsyncRunner
|
|
|
|
// inst refers to a non-nil instrument when `runner` is a
|
|
// AsyncSingleRunner.
|
|
inst metric.AsyncImpl
|
|
}
|
|
|
|
// NewAsyncInstrumentState returns a new *AsyncInstrumentState, for
|
|
// use by MeterImpl to manage running the set of observer callbacks in
|
|
// the correct order.
|
|
//
|
|
// errorHandler is used to print an error condition. If errorHandler
|
|
// nil, the default error handler will be used that prints to
|
|
// os.Stderr. Only the first error is passed to the handler, after
|
|
// which errors are skipped.
|
|
func NewAsyncInstrumentState(errorHandler func(error)) *AsyncInstrumentState {
|
|
if errorHandler == nil {
|
|
errorHandler = func(err error) {
|
|
fmt.Fprintln(os.Stderr, "Metrics Async state error:", err)
|
|
}
|
|
}
|
|
return &AsyncInstrumentState{
|
|
errorHandler: errorHandler,
|
|
runnerMap: map[asyncRunnerPair]struct{}{},
|
|
}
|
|
}
|
|
|
|
// Instruments returns the asynchronous instruments managed by this
|
|
// object, the set that should be checkpointed after observers are
|
|
// run.
|
|
func (a *AsyncInstrumentState) Instruments() []metric.AsyncImpl {
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
return a.instruments
|
|
}
|
|
|
|
// Register adds a new asynchronous instrument to by managed by this
|
|
// object. This should be called during NewAsyncInstrument() and
|
|
// assumes that errors (e.g., duplicate registration) have already
|
|
// been checked.
|
|
func (a *AsyncInstrumentState) Register(inst metric.AsyncImpl, runner metric.AsyncRunner) {
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
|
|
a.instruments = append(a.instruments, inst)
|
|
|
|
// asyncRunnerPair reflects this callback in the asyncRunners
|
|
// list. If this is a batch runner, the instrument is nil.
|
|
// If this is a single-Observer runner, the instrument is
|
|
// included. This ensures that batch callbacks are called
|
|
// once and single callbacks are called once per instrument.
|
|
rp := asyncRunnerPair{
|
|
runner: runner,
|
|
}
|
|
if _, ok := runner.(metric.AsyncSingleRunner); ok {
|
|
rp.inst = inst
|
|
}
|
|
|
|
if _, ok := a.runnerMap[rp]; !ok {
|
|
a.runnerMap[rp] = struct{}{}
|
|
a.runners = append(a.runners, rp)
|
|
}
|
|
}
|
|
|
|
// Run executes the complete set of observer callbacks.
|
|
func (a *AsyncInstrumentState) Run(ctx context.Context, collector AsyncCollector) {
|
|
a.lock.Lock()
|
|
runners := a.runners
|
|
a.lock.Unlock()
|
|
|
|
for _, rp := range runners {
|
|
// The runner must be a single or batch runner, no
|
|
// other implementations are possible because the
|
|
// interface has un-exported methods.
|
|
|
|
if singleRunner, ok := rp.runner.(metric.AsyncSingleRunner); ok {
|
|
singleRunner.Run(ctx, rp.inst, collector.CollectAsync)
|
|
continue
|
|
}
|
|
|
|
if multiRunner, ok := rp.runner.(metric.AsyncBatchRunner); ok {
|
|
multiRunner.Run(ctx, collector.CollectAsync)
|
|
continue
|
|
}
|
|
|
|
a.errorOnce.Do(func() {
|
|
a.errorHandler(fmt.Errorf("%w: type %T (reported once)", ErrInvalidAsyncRunner, rp))
|
|
})
|
|
}
|
|
}
|