1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-22 03:38:42 +02:00
opentelemetry-go/sdk/metric/periodic_reader.go
Tyler Yahn 5dff273a1e
Use gofumpt instead of gofmt (#4623)
* Use gofumpt instead of gofmt in golangci-lint conf

* Run gofumpt fixes

* Format generated templates

---------

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
2023-10-16 10:02:21 -07:00

382 lines
11 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// Default periodic reader timing.
const (
defaultTimeout = time.Millisecond * 30000
defaultInterval = time.Millisecond * 60000
)
// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
producers []Producer
}
// newPeriodicReaderConfig returns a periodicReaderConfig configured with
// options.
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
c := periodicReaderConfig{
interval: envDuration(envInterval, defaultInterval),
timeout: envDuration(envTimeout, defaultTimeout),
}
for _, o := range options {
c = o.applyPeriodic(c)
}
return c
}
// PeriodicReaderOption applies a configuration option value to a PeriodicReader.
type PeriodicReaderOption interface {
applyPeriodic(periodicReaderConfig) periodicReaderConfig
}
// periodicReaderOptionFunc applies a set of options to a periodicReaderConfig.
type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig
// applyPeriodic returns a periodicReaderConfig with option(s) applied.
func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) periodicReaderConfig {
return o(conf)
}
// WithTimeout configures the time a PeriodicReader waits for an export to
// complete before canceling it. This includes an export which occurs as part
// of Shutdown or ForceFlush if the user passed context does not have a
// deadline. If the user passed context does have a deadline, it will be used
// instead.
//
// This option overrides any value set for the
// OTEL_METRIC_EXPORT_TIMEOUT environment variable.
//
// If this option is not used or d is less than or equal to zero, 30 seconds
// is used as the default.
func WithTimeout(d time.Duration) PeriodicReaderOption {
return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
if d <= 0 {
return conf
}
conf.timeout = d
return conf
})
}
// WithInterval configures the intervening time between exports for a
// PeriodicReader.
//
// This option overrides any value set for the
// OTEL_METRIC_EXPORT_INTERVAL environment variable.
//
// If this option is not used or d is less than or equal to zero, 60 seconds
// is used as the default.
func WithInterval(d time.Duration) PeriodicReaderOption {
return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
if d <= 0 {
return conf
}
conf.interval = d
return conf
})
}
// NewPeriodicReader returns a Reader that collects and exports metric data to
// the exporter at a defined interval. By default, the returned Reader will
// collect and export data every 60 seconds, and will cancel any attempts that
// exceed 30 seconds, collect and export combined. The collect and export time
// are not counted towards the interval between attempts.
//
// The Collect method of the returned Reader continues to gather and return
// metric data to the user. It will not automatically send that data to the
// exporter. That is left to the user to accomplish.
func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
r := &PeriodicReader{
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
rmPool: sync.Pool{
New: func() interface{} {
return &metricdata.ResourceMetrics{}
},
},
}
r.externalProducers.Store(conf.producers)
go func() {
defer func() { close(r.done) }()
r.run(ctx, conf.interval)
}()
return r
}
// PeriodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type PeriodicReader struct {
sdkProducer atomic.Value
mu sync.Mutex
isShutdown bool
externalProducers atomic.Value
interval time.Duration
timeout time.Duration
exporter Exporter
flushCh chan chan error
done chan struct{}
cancel context.CancelFunc
shutdownOnce sync.Once
rmPool sync.Pool
}
// Compile time check the periodicReader implements Reader and is comparable.
var _ = map[Reader]struct{}{&PeriodicReader{}: {}}
// newTicker allows testing override.
var newTicker = time.NewTicker
// run continuously collects and exports metric data at the specified
// interval. This will run until ctx is canceled or times out.
func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) {
ticker := newTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := r.collectAndExport(ctx)
if err != nil {
otel.Handle(err)
}
case errCh := <-r.flushCh:
errCh <- r.collectAndExport(ctx)
ticker.Reset(interval)
case <-ctx.Done():
return
}
}
}
// register registers p as the producer of this reader.
func (r *PeriodicReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
}
// temporality reports the Temporality for the instrument kind provided.
func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
}
// aggregation returns what Aggregation to use for kind.
func (r *PeriodicReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type.
return r.exporter.Aggregation(kind)
}
// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
if err == nil {
err = r.export(ctx, rm)
}
r.rmPool.Put(rm)
return err
}
// Collect gathers all metric data related to the Reader from
// the SDK and other Producers and stores the result in rm. The metric
// data is not exported to the configured exporter, it is left to the caller to
// handle that if desired.
//
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
// Collect will return an error if the context's Done channel is closed.
//
// This method is safe to call concurrently.
func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
// TODO (#3047): When collect is updated to accept output as param, pass rm.
return r.collect(ctx, r.sdkProducer.Load(), rm)
}
// collect unwraps p as a produceHolder and returns its produce results.
func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
if p == nil {
return ErrReaderNotRegistered
}
ph, ok := p.(produceHolder)
if !ok {
// The atomic.Value is entirely in the periodicReader's control so
// this should never happen. In the unforeseen case that this does
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return err
}
err := ph.produce(ctx, rm)
if err != nil {
return err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
global.Debug("PeriodicReader collection", "Data", rm)
return unifyErrors(errs)
}
// export exports metric data m using r's exporter.
func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
return r.exporter.Export(ctx, m)
}
// ForceFlush flushes pending telemetry.
//
// This method is safe to call concurrently.
func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
// Prioritize the ctx timeout if it is set.
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.timeout)
defer cancel()
}
errCh := make(chan error, 1)
select {
case r.flushCh <- errCh:
select {
case err := <-errCh:
if err != nil {
return err
}
close(errCh)
case <-ctx.Done():
return ctx.Err()
}
case <-r.done:
return ErrReaderShutdown
case <-ctx.Done():
return ctx.Err()
}
return r.exporter.ForceFlush(ctx)
}
// Shutdown flushes pending telemetry and then stops the export pipeline.
//
// This method is safe to call concurrently.
func (r *PeriodicReader) Shutdown(ctx context.Context) error {
err := ErrReaderShutdown
r.shutdownOnce.Do(func() {
// Prioritize the ctx timeout if it is set.
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.timeout)
defer cancel()
}
// Stop the run loop.
r.cancel()
<-r.done
// Any future call to Collect will now return ErrReaderShutdown.
ph := r.sdkProducer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})
if ph != nil { // Reader was registered.
// Flush pending telemetry.
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
err = r.collect(ctx, ph, m)
if err == nil {
err = r.export(ctx, m)
}
r.rmPool.Put(m)
}
sErr := r.exporter.Shutdown(ctx)
if err == nil || err == ErrReaderShutdown {
err = sErr
}
r.mu.Lock()
defer r.mu.Unlock()
r.isShutdown = true
// release references to Producer(s)
r.externalProducers.Store([]Producer{})
})
return err
}
// MarshalLog returns logging data about the PeriodicReader.
func (r *PeriodicReader) MarshalLog() interface{} {
r.mu.Lock()
down := r.isShutdown
r.mu.Unlock()
return struct {
Type string
Exporter Exporter
Registered bool
Shutdown bool
Interval time.Duration
Timeout time.Duration
}{
Type: "PeriodicReader",
Exporter: r.exporter,
Registered: r.sdkProducer.Load() != nil,
Shutdown: down,
Interval: r.interval,
Timeout: r.timeout,
}
}