1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-12 10:04:29 +02:00
opentelemetry-go/internal/metric/async.go
Pablo Baeyens 19294aab4c
Add vanity imports to internal packages (#2280)
* `porto -w --include-internal .`

* Bump `porto` to 0.4.0 and include internal files

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
2021-10-29 09:34:37 -07:00

150 lines
4.9 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric // import "go.opentelemetry.io/otel/internal/metric"
import (
"context"
"errors"
"fmt"
"sync"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/sdkapi"
)
//nolint:revive // ignoring missing comments for exported error in an internal package
var ErrInvalidAsyncRunner = errors.New("unknown async runner type")
// AsyncCollector is an interface used between the MeterImpl and the
// AsyncInstrumentState helper below. This interface is implemented by
// the SDK to provide support for running observer callbacks.
type AsyncCollector interface {
// CollectAsync passes a batch of observations to the MeterImpl.
CollectAsync(labels []attribute.KeyValue, observation ...sdkapi.Observation)
}
// AsyncInstrumentState manages an ordered set of asynchronous
// instruments and the distinct runners, taking into account batch
// observer callbacks.
type AsyncInstrumentState struct {
lock sync.Mutex
// errorOnce will use the otel.Handler to report an error
// once in case of an invalid runner attempting to run.
errorOnce sync.Once
// runnerMap keeps the set of runners that will run each
// collection interval. Singletons are entered with a real
// instrument each, batch observers are entered with a nil
// instrument, ensuring that when a singleton callback is used
// repeatedly, it is executed repeatedly in the interval, while
// when a batch callback is used repeatedly, it only executes
// once per interval.
runnerMap map[asyncRunnerPair]struct{}
// runners maintains the set of runners in the order they were
// registered.
runners []asyncRunnerPair
// instruments maintains the set of instruments in the order
// they were registered.
instruments []sdkapi.AsyncImpl
}
// asyncRunnerPair is a map entry for Observer callback runners.
type asyncRunnerPair struct {
// runner is used as a map key here. The API ensures
// that all callbacks are pointers for this reason.
runner sdkapi.AsyncRunner
// inst refers to a non-nil instrument when `runner` is a
// AsyncSingleRunner.
inst sdkapi.AsyncImpl
}
// NewAsyncInstrumentState returns a new *AsyncInstrumentState, for
// use by MeterImpl to manage running the set of observer callbacks in
// the correct order.
func NewAsyncInstrumentState() *AsyncInstrumentState {
return &AsyncInstrumentState{
runnerMap: map[asyncRunnerPair]struct{}{},
}
}
// Instruments returns the asynchronous instruments managed by this
// object, the set that should be checkpointed after observers are
// run.
func (a *AsyncInstrumentState) Instruments() []sdkapi.AsyncImpl {
a.lock.Lock()
defer a.lock.Unlock()
return a.instruments
}
// Register adds a new asynchronous instrument to by managed by this
// object. This should be called during NewAsyncInstrument() and
// assumes that errors (e.g., duplicate registration) have already
// been checked.
func (a *AsyncInstrumentState) Register(inst sdkapi.AsyncImpl, runner sdkapi.AsyncRunner) {
a.lock.Lock()
defer a.lock.Unlock()
a.instruments = append(a.instruments, inst)
// asyncRunnerPair reflects this callback in the asyncRunners
// list. If this is a batch runner, the instrument is nil.
// If this is a single-Observer runner, the instrument is
// included. This ensures that batch callbacks are called
// once and single callbacks are called once per instrument.
rp := asyncRunnerPair{
runner: runner,
}
if _, ok := runner.(sdkapi.AsyncSingleRunner); ok {
rp.inst = inst
}
if _, ok := a.runnerMap[rp]; !ok {
a.runnerMap[rp] = struct{}{}
a.runners = append(a.runners, rp)
}
}
// Run executes the complete set of observer callbacks.
func (a *AsyncInstrumentState) Run(ctx context.Context, collector AsyncCollector) {
a.lock.Lock()
runners := a.runners
a.lock.Unlock()
for _, rp := range runners {
// The runner must be a single or batch runner, no
// other implementations are possible because the
// interface has un-exported methods.
if singleRunner, ok := rp.runner.(sdkapi.AsyncSingleRunner); ok {
singleRunner.Run(ctx, rp.inst, collector.CollectAsync)
continue
}
if multiRunner, ok := rp.runner.(sdkapi.AsyncBatchRunner); ok {
multiRunner.Run(ctx, collector.CollectAsync)
continue
}
a.errorOnce.Do(func() {
otel.Handle(fmt.Errorf("%w: type %T (reported once)", ErrInvalidAsyncRunner, rp))
})
}
}