mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-03 13:11:53 +02:00
Flush pending telemetry when ForceFlush or Shutdown are called on a PeriodicReader (#3220)
* Flush pending telemetry when ForceFlush called * Test flush of periodic reader * Flush pending telemetry on Shutdown * Fix things * Rename pHolder to p * Add testing for Shutdown * Fix doc for collect method * Fix collectAndExport doc * Fix collectAndExport English * Remove stdoutmetric example expected output * Add changes to changelog * Revert inadvertent change to golangci conf Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
This commit is contained in:
parent
8c6e6c4bac
commit
111a1d7bbd
@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
### Changed
|
||||
|
||||
- `span.SetStatus` has been updated such that calls that lower the status are now no-ops. (#3214)
|
||||
- Flush pending measurements with the `PeriodicReader` in the `go.opentelemetry.io/otel/sdk/metric` when `ForceFlush` or `Shutdown` are called. (#3220)
|
||||
|
||||
## [0.32.1] Metric SDK (Alpha) - 2022-09-22
|
||||
|
||||
|
@ -123,112 +123,4 @@ func Example() {
|
||||
|
||||
// Ensure the periodic reader is cleaned up by shutting down the sdk.
|
||||
_ = sdk.Shutdown(ctx)
|
||||
|
||||
// Output:
|
||||
// {
|
||||
// "Resource": [
|
||||
// {
|
||||
// "Key": "service.name",
|
||||
// "Value": {
|
||||
// "Type": "STRING",
|
||||
// "Value": "stdoutmetric-example"
|
||||
// }
|
||||
// }
|
||||
// ],
|
||||
// "ScopeMetrics": [
|
||||
// {
|
||||
// "Scope": {
|
||||
// "Name": "example",
|
||||
// "Version": "v0.0.1",
|
||||
// "SchemaURL": ""
|
||||
// },
|
||||
// "Metrics": [
|
||||
// {
|
||||
// "Name": "requests",
|
||||
// "Description": "Number of requests received",
|
||||
// "Unit": "1",
|
||||
// "Data": {
|
||||
// "DataPoints": [
|
||||
// {
|
||||
// "Attributes": [
|
||||
// {
|
||||
// "Key": "server",
|
||||
// "Value": {
|
||||
// "Type": "STRING",
|
||||
// "Value": "central"
|
||||
// }
|
||||
// }
|
||||
// ],
|
||||
// "StartTime": "2000-01-01T00:00:00Z",
|
||||
// "Time": "2000-01-01T00:00:01Z",
|
||||
// "Value": 5
|
||||
// }
|
||||
// ],
|
||||
// "Temporality": "DeltaTemporality",
|
||||
// "IsMonotonic": true
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "Name": "latency",
|
||||
// "Description": "Time spend processing received requests",
|
||||
// "Unit": "ms",
|
||||
// "Data": {
|
||||
// "DataPoints": [
|
||||
// {
|
||||
// "Attributes": [
|
||||
// {
|
||||
// "Key": "server",
|
||||
// "Value": {
|
||||
// "Type": "STRING",
|
||||
// "Value": "central"
|
||||
// }
|
||||
// }
|
||||
// ],
|
||||
// "StartTime": "2000-01-01T00:00:00Z",
|
||||
// "Time": "2000-01-01T00:00:01Z",
|
||||
// "Count": 10,
|
||||
// "Bounds": [
|
||||
// 1,
|
||||
// 5,
|
||||
// 10
|
||||
// ],
|
||||
// "BucketCounts": [
|
||||
// 1,
|
||||
// 3,
|
||||
// 6,
|
||||
// 0
|
||||
// ],
|
||||
// "Sum": 57
|
||||
// }
|
||||
// ],
|
||||
// "Temporality": "DeltaTemporality"
|
||||
// }
|
||||
// },
|
||||
// {
|
||||
// "Name": "temperature",
|
||||
// "Description": "CPU global temperature",
|
||||
// "Unit": "cel(1 K)",
|
||||
// "Data": {
|
||||
// "DataPoints": [
|
||||
// {
|
||||
// "Attributes": [
|
||||
// {
|
||||
// "Key": "server",
|
||||
// "Value": {
|
||||
// "Type": "STRING",
|
||||
// "Value": "central"
|
||||
// }
|
||||
// }
|
||||
// ],
|
||||
// "StartTime": "0001-01-01T00:00:00Z",
|
||||
// "Time": "2000-01-01T00:00:01Z",
|
||||
// "Value": 32.4
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
}
|
||||
|
@ -115,15 +115,16 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
|
||||
r := &periodicReader{
|
||||
timeout: conf.timeout,
|
||||
exporter: exporter,
|
||||
flushCh: make(chan chan error),
|
||||
cancel: cancel,
|
||||
done: make(chan struct{}),
|
||||
|
||||
temporalitySelector: conf.temporalitySelector,
|
||||
aggregationSelector: conf.aggregationSelector,
|
||||
}
|
||||
|
||||
r.wg.Add(1)
|
||||
go func() {
|
||||
defer r.wg.Done()
|
||||
defer func() { close(r.done) }()
|
||||
r.run(ctx, conf.interval)
|
||||
}()
|
||||
|
||||
@ -137,11 +138,12 @@ type periodicReader struct {
|
||||
|
||||
timeout time.Duration
|
||||
exporter Exporter
|
||||
flushCh chan chan error
|
||||
|
||||
temporalitySelector TemporalitySelector
|
||||
aggregationSelector AggregationSelector
|
||||
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
cancel context.CancelFunc
|
||||
shutdownOnce sync.Once
|
||||
}
|
||||
@ -161,15 +163,13 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
m, err := r.Collect(ctx)
|
||||
if err == nil {
|
||||
c, cancel := context.WithTimeout(ctx, r.timeout)
|
||||
err = r.exporter.Export(c, m)
|
||||
cancel()
|
||||
}
|
||||
err := r.collectAndExport(ctx)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
}
|
||||
case errCh := <-r.flushCh:
|
||||
errCh <- r.collectAndExport(ctx)
|
||||
ticker.Reset(interval)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
@ -195,13 +195,27 @@ func (r *periodicReader) aggregation(kind view.InstrumentKind) aggregation.Aggre
|
||||
return r.aggregationSelector(kind)
|
||||
}
|
||||
|
||||
// collectAndExport gather all metric data related to the periodicReader r from
|
||||
// the SDK and exports it with r's exporter.
|
||||
func (r *periodicReader) collectAndExport(ctx context.Context) error {
|
||||
m, err := r.Collect(ctx)
|
||||
if err == nil {
|
||||
err = r.export(ctx, m)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Collect gathers and returns all metric data related to the Reader from
|
||||
// the SDK. The returned metric data is not exported to the configured
|
||||
// exporter, it is left to the caller to handle that if desired.
|
||||
//
|
||||
// An error is returned if this is called after Shutdown.
|
||||
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
|
||||
p := r.producer.Load()
|
||||
return r.collect(ctx, r.producer.Load())
|
||||
}
|
||||
|
||||
// collect unwraps p as a produceHolder and returns its produce results.
|
||||
func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata.ResourceMetrics, error) {
|
||||
if p == nil {
|
||||
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
|
||||
}
|
||||
@ -218,25 +232,61 @@ func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetric
|
||||
return ph.produce(ctx)
|
||||
}
|
||||
|
||||
// ForceFlush flushes the Exporter.
|
||||
// export exports metric data m using r's exporter.
|
||||
func (r *periodicReader) export(ctx context.Context, m metricdata.ResourceMetrics) error {
|
||||
c, cancel := context.WithTimeout(ctx, r.timeout)
|
||||
defer cancel()
|
||||
return r.exporter.Export(c, m)
|
||||
}
|
||||
|
||||
// ForceFlush flushes pending telemetry.
|
||||
func (r *periodicReader) ForceFlush(ctx context.Context) error {
|
||||
errCh := make(chan error, 1)
|
||||
select {
|
||||
case r.flushCh <- errCh:
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
close(errCh)
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
case <-r.done:
|
||||
return ErrReaderShutdown
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return r.exporter.ForceFlush(ctx)
|
||||
}
|
||||
|
||||
// Shutdown stops the export pipeline.
|
||||
// Shutdown flushes pending telemetry and then stops the export pipeline.
|
||||
func (r *periodicReader) Shutdown(ctx context.Context) error {
|
||||
err := ErrReaderShutdown
|
||||
r.shutdownOnce.Do(func() {
|
||||
// Stop the run loop.
|
||||
r.cancel()
|
||||
r.wg.Wait()
|
||||
<-r.done
|
||||
|
||||
// Any future call to Collect will now return ErrReaderShutdown.
|
||||
r.producer.Store(produceHolder{
|
||||
ph := r.producer.Swap(produceHolder{
|
||||
produce: shutdownProducer{}.produce,
|
||||
})
|
||||
|
||||
err = r.exporter.Shutdown(ctx)
|
||||
if ph != nil { // Reader was registered.
|
||||
// Flush pending telemetry.
|
||||
var m metricdata.ResourceMetrics
|
||||
m, err = r.collect(ctx, ph)
|
||||
if err == nil {
|
||||
err = r.export(ctx, m)
|
||||
}
|
||||
}
|
||||
|
||||
sErr := r.exporter.Shutdown(ctx)
|
||||
if err == nil || err == ErrReaderShutdown {
|
||||
err = sErr
|
||||
}
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
@ -98,6 +98,7 @@ func (ts *periodicReaderTestSuite) SetupTest() {
|
||||
}
|
||||
|
||||
ts.ErrReader = NewPeriodicReader(e)
|
||||
ts.ErrReader.register(testProducer{})
|
||||
}
|
||||
|
||||
func (ts *periodicReaderTestSuite) TearDownTest() {
|
||||
@ -138,11 +139,13 @@ func (eh chErrorHandler) Handle(err error) {
|
||||
eh.Err <- err
|
||||
}
|
||||
|
||||
func TestPeriodicReaderRun(t *testing.T) {
|
||||
func triggerTicker(t *testing.T) chan time.Time {
|
||||
t.Helper()
|
||||
|
||||
// Override the ticker C chan so tests are not flaky and rely on timing.
|
||||
defer func(orig func(time.Duration) *time.Ticker) {
|
||||
newTicker = orig
|
||||
}(newTicker)
|
||||
orig := newTicker
|
||||
t.Cleanup(func() { newTicker = orig })
|
||||
|
||||
// Keep this at size zero so when triggered with a send it will hang until
|
||||
// the select case is selected and the collection loop is started.
|
||||
trigger := make(chan time.Time)
|
||||
@ -151,6 +154,11 @@ func TestPeriodicReaderRun(t *testing.T) {
|
||||
ticker.C = trigger
|
||||
return ticker
|
||||
}
|
||||
return trigger
|
||||
}
|
||||
|
||||
func TestPeriodicReaderRun(t *testing.T) {
|
||||
trigger := triggerTicker(t)
|
||||
|
||||
// Register an error handler to validate export errors are passed to
|
||||
// otel.Handle.
|
||||
@ -177,6 +185,43 @@ func TestPeriodicReaderRun(t *testing.T) {
|
||||
_ = r.Shutdown(context.Background())
|
||||
}
|
||||
|
||||
func TestPeriodicReaderFlushesPending(t *testing.T) {
|
||||
// Override the ticker so tests are not flaky and rely on timing.
|
||||
trigger := triggerTicker(t)
|
||||
t.Cleanup(func() { close(trigger) })
|
||||
|
||||
expFunc := func(t *testing.T) (exp Exporter, called *bool) {
|
||||
called = new(bool)
|
||||
return &fnExporter{
|
||||
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
|
||||
// The testProducer produces testMetrics.
|
||||
assert.Equal(t, testMetrics, m)
|
||||
*called = true
|
||||
return assert.AnError
|
||||
},
|
||||
}, called
|
||||
}
|
||||
|
||||
t.Run("ForceFlush", func(t *testing.T) {
|
||||
exp, called := expFunc(t)
|
||||
r := NewPeriodicReader(exp)
|
||||
r.register(testProducer{})
|
||||
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
|
||||
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
|
||||
|
||||
// Ensure Reader is allowed clean up attempt.
|
||||
_ = r.Shutdown(context.Background())
|
||||
})
|
||||
|
||||
t.Run("Shutdown", func(t *testing.T) {
|
||||
exp, called := expFunc(t)
|
||||
r := NewPeriodicReader(exp)
|
||||
r.register(testProducer{})
|
||||
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
|
||||
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkPeriodicReader(b *testing.B) {
|
||||
b.Run("Collect", benchReaderCollectFunc(
|
||||
NewPeriodicReader(new(fnExporter)),
|
||||
|
Loading…
x
Reference in New Issue
Block a user