mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-20 03:30:02 +02:00
PeriodicReader.Shutdown now applies the periodic reader's timeout by default (#4356)
This commit is contained in:
parent
d8d3502efc
commit
c1a644a10c
@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||||||
- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289)
|
- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289)
|
||||||
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)
|
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)
|
||||||
- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333)
|
- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333)
|
||||||
|
- `PeriodicReader.Shutdown` in `go.opentelemetry.io/otel/sdk/metric` now applies the periodic reader's timeout by default. (#4356)
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
|
@ -67,7 +67,8 @@ func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) perio
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WithTimeout configures the time a PeriodicReader waits for an export to
|
// WithTimeout configures the time a PeriodicReader waits for an export to
|
||||||
// complete before canceling it.
|
// complete before canceling it. This includes an export which occurs as part
|
||||||
|
// of Shutdown.
|
||||||
//
|
//
|
||||||
// This option overrides any value set for the
|
// This option overrides any value set for the
|
||||||
// OTEL_METRIC_EXPORT_TIMEOUT environment variable.
|
// OTEL_METRIC_EXPORT_TIMEOUT environment variable.
|
||||||
@ -323,6 +324,8 @@ func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
|
|||||||
func (r *PeriodicReader) Shutdown(ctx context.Context) error {
|
func (r *PeriodicReader) Shutdown(ctx context.Context) error {
|
||||||
err := ErrReaderShutdown
|
err := ErrReaderShutdown
|
||||||
r.shutdownOnce.Do(func() {
|
r.shutdownOnce.Do(func() {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, r.timeout)
|
||||||
|
defer cancel()
|
||||||
// Stop the run loop.
|
// Stop the run loop.
|
||||||
r.cancel()
|
r.cancel()
|
||||||
<-r.done
|
<-r.done
|
||||||
|
@ -337,6 +337,46 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
|
|||||||
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
|
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
|
||||||
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
|
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("Shutdown timeout on producer", func(t *testing.T) {
|
||||||
|
exp, called := expFunc(t)
|
||||||
|
timeout := time.Millisecond
|
||||||
|
r := NewPeriodicReader(exp, WithTimeout(timeout))
|
||||||
|
r.register(testSDKProducer{
|
||||||
|
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
|
||||||
|
select {
|
||||||
|
case <-time.After(timeout + time.Second):
|
||||||
|
*rm = testResourceMetricsA
|
||||||
|
case <-ctx.Done():
|
||||||
|
// we timed out before we could collect metrics
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}})
|
||||||
|
r.RegisterProducer(testExternalProducer{})
|
||||||
|
assert.Equal(t, context.DeadlineExceeded, r.Shutdown(context.Background()), "timeout error not returned")
|
||||||
|
assert.False(t, *called, "exporter Export method called when it should have failed before export")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Shutdown timeout on external producer", func(t *testing.T) {
|
||||||
|
exp, called := expFunc(t)
|
||||||
|
timeout := time.Millisecond
|
||||||
|
r := NewPeriodicReader(exp, WithTimeout(timeout))
|
||||||
|
r.register(testSDKProducer{})
|
||||||
|
r.RegisterProducer(testExternalProducer{
|
||||||
|
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
|
||||||
|
select {
|
||||||
|
case <-time.After(timeout + time.Second):
|
||||||
|
case <-ctx.Done():
|
||||||
|
// we timed out before we could collect metrics
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
return []metricdata.ScopeMetrics{testScopeMetricsA}, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.Equal(t, context.DeadlineExceeded, r.Shutdown(context.Background()), "timeout error not returned")
|
||||||
|
assert.False(t, *called, "exporter Export method called when it should have failed before export")
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkPeriodicReader(b *testing.B) {
|
func BenchmarkPeriodicReader(b *testing.B) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user