mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-30 04:40:41 +02:00
sdk/metric: Remove Reader.ForceFlush and ManualReader.ForceFlush (#4375)
This commit is contained in:
parent
c513972985
commit
a9552aaffa
@ -45,6 +45,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- `PeriodicReader.Shutdown` and `PeriodicReader.ForceFlush` in `go.opentelemetry.io/otel/sdk/metric` now apply the periodic reader's timeout to the operation if the user provided context does not contain a deadline. (#4356, #4377)
|
||||
- Upgrade all use of `go.opentelemetry.io/otel/semconv` to use `v1.21.0`. (#4408)
|
||||
|
||||
### Removed
|
||||
|
||||
- Remove `Reader.ForceFlush` in `go.opentelemetry.io/otel/metric`.
|
||||
Notice that `PeriodicReader.ForceFlush` is still available. (#4375)
|
||||
|
||||
### Fixed
|
||||
|
||||
- Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143)
|
||||
|
@ -37,7 +37,9 @@ func (c config) readerSignals() (forceFlush, shutdown func(context.Context) erro
|
||||
var fFuncs, sFuncs []func(context.Context) error
|
||||
for _, r := range c.readers {
|
||||
sFuncs = append(sFuncs, r.Shutdown)
|
||||
fFuncs = append(fFuncs, r.ForceFlush)
|
||||
if f, ok := r.(interface{ ForceFlush(context.Context) error }); ok {
|
||||
fFuncs = append(fFuncs, f.ForceFlush)
|
||||
}
|
||||
}
|
||||
|
||||
return unify(fFuncs), unifyShutdown(sFuncs)
|
||||
|
@ -91,13 +91,6 @@ func (mr *ManualReader) aggregation(kind InstrumentKind) aggregation.Aggregation
|
||||
return mr.aggregationSelector(kind)
|
||||
}
|
||||
|
||||
// ForceFlush is a no-op, it always returns nil.
|
||||
//
|
||||
// This method is safe to call concurrently.
|
||||
func (mr *ManualReader) ForceFlush(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown closes any connections and frees any resources used by the reader.
|
||||
//
|
||||
// This method is safe to call concurrently.
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
@ -198,7 +199,7 @@ func (e *fnExporter) Shutdown(ctx context.Context) error {
|
||||
type periodicReaderTestSuite struct {
|
||||
*readerTestSuite
|
||||
|
||||
ErrReader Reader
|
||||
ErrReader *PeriodicReader
|
||||
}
|
||||
|
||||
func (ts *periodicReaderTestSuite) SetupTest() {
|
||||
@ -425,6 +426,15 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestPeriodicReaderMultipleForceFlush(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
r := NewPeriodicReader(new(fnExporter))
|
||||
r.register(testSDKProducer{})
|
||||
r.RegisterProducer(testExternalProducer{})
|
||||
require.NoError(t, r.ForceFlush(ctx))
|
||||
require.NoError(t, r.ForceFlush(ctx))
|
||||
}
|
||||
|
||||
func BenchmarkPeriodicReader(b *testing.B) {
|
||||
b.Run("Collect", benchReaderCollectFunc(
|
||||
NewPeriodicReader(new(fnExporter)),
|
||||
|
@ -110,6 +110,9 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri
|
||||
// telemetry be flushed or all resources have been released in these
|
||||
// situations.
|
||||
//
|
||||
// ForceFlush calls ForceFlush(context.Context) error
|
||||
// on all Readers that implements this method.
|
||||
//
|
||||
// This method is safe to call concurrently.
|
||||
func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
|
||||
if mp.forceFlush != nil {
|
||||
|
@ -84,16 +84,6 @@ type Reader interface {
|
||||
// passed context is expected to be honored.
|
||||
Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error
|
||||
|
||||
// ForceFlush flushes all metric measurements held in an export pipeline.
|
||||
//
|
||||
// This deadline or cancellation of the passed context are honored. An appropriate
|
||||
// error will be returned in these situations. There is no guaranteed that all
|
||||
// telemetry be flushed or all resources have been released in these
|
||||
// situations.
|
||||
//
|
||||
// This method needs to be concurrent safe.
|
||||
ForceFlush(context.Context) error
|
||||
|
||||
// Shutdown flushes all metric measurements held in an export pipeline and releases any
|
||||
// held computational resources.
|
||||
//
|
||||
|
@ -93,14 +93,6 @@ func (ts *readerTestSuite) TestShutdownTwice() {
|
||||
ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown)
|
||||
}
|
||||
|
||||
func (ts *readerTestSuite) TestMultipleForceFlush() {
|
||||
ctx := context.Background()
|
||||
ts.Reader.register(testSDKProducer{})
|
||||
ts.Reader.RegisterProducer(testExternalProducer{})
|
||||
ts.Require().NoError(ts.Reader.ForceFlush(ctx))
|
||||
ts.NoError(ts.Reader.ForceFlush(ctx))
|
||||
}
|
||||
|
||||
func (ts *readerTestSuite) TestMultipleRegister() {
|
||||
p0 := testSDKProducer{
|
||||
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
|
||||
@ -186,11 +178,13 @@ func (ts *readerTestSuite) TestMethodConcurrentSafe() {
|
||||
_ = ts.Reader.Collect(ctx, nil)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = ts.Reader.ForceFlush(ctx)
|
||||
}()
|
||||
if f, ok := ts.Reader.(interface{ ForceFlush(context.Context) error }); ok {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = f.ForceFlush(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user