From a9552aaffa8266578d4195db5f844583749a6d01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Fri, 11 Aug 2023 09:22:21 +0200 Subject: [PATCH] sdk/metric: Remove Reader.ForceFlush and ManualReader.ForceFlush (#4375) --- CHANGELOG.md | 5 +++++ sdk/metric/config.go | 4 +++- sdk/metric/manual_reader.go | 7 ------- sdk/metric/periodic_reader_test.go | 12 +++++++++++- sdk/metric/provider.go | 3 +++ sdk/metric/reader.go | 10 ---------- sdk/metric/reader_test.go | 20 +++++++------------- 7 files changed, 29 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a262e1c86..53588f04d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `PeriodicReader.Shutdown` and `PeriodicReader.ForceFlush` in `go.opentelemetry.io/otel/sdk/metric` now apply the periodic reader's timeout to the operation if the user provided context does not contain a deadline. (#4356, #4377) - Upgrade all use of `go.opentelemetry.io/otel/semconv` to use `v1.21.0`. (#4408) +### Removed + +- Remove `Reader.ForceFlush` in `go.opentelemetry.io/otel/metric`. + Notice that `PeriodicReader.ForceFlush` is still available. (#4375) + ### Fixed - Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index c837df8b7..0b1911284 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -37,7 +37,9 @@ func (c config) readerSignals() (forceFlush, shutdown func(context.Context) erro var fFuncs, sFuncs []func(context.Context) error for _, r := range c.readers { sFuncs = append(sFuncs, r.Shutdown) - fFuncs = append(fFuncs, r.ForceFlush) + if f, ok := r.(interface{ ForceFlush(context.Context) error }); ok { + fFuncs = append(fFuncs, f.ForceFlush) + } } return unify(fFuncs), unifyShutdown(sFuncs) diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 898af86ed..a7715f5b3 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -91,13 +91,6 @@ func (mr *ManualReader) aggregation(kind InstrumentKind) aggregation.Aggregation return mr.aggregationSelector(kind) } -// ForceFlush is a no-op, it always returns nil. -// -// This method is safe to call concurrently. -func (mr *ManualReader) ForceFlush(context.Context) error { - return nil -} - // Shutdown closes any connections and frees any resources used by the reader. // // This method is safe to call concurrently. diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 0a34b7e99..0e9d66944 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.opentelemetry.io/otel" @@ -198,7 +199,7 @@ func (e *fnExporter) Shutdown(ctx context.Context) error { type periodicReaderTestSuite struct { *readerTestSuite - ErrReader Reader + ErrReader *PeriodicReader } func (ts *periodicReaderTestSuite) SetupTest() { @@ -425,6 +426,15 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { }) } +func TestPeriodicReaderMultipleForceFlush(t *testing.T) { + ctx := context.Background() + r := NewPeriodicReader(new(fnExporter)) + r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) + require.NoError(t, r.ForceFlush(ctx)) + require.NoError(t, r.ForceFlush(ctx)) +} + func BenchmarkPeriodicReader(b *testing.B) { b.Run("Collect", benchReaderCollectFunc( NewPeriodicReader(new(fnExporter)), diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 49dd071c9..7d1a9183c 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -110,6 +110,9 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri // telemetry be flushed or all resources have been released in these // situations. // +// ForceFlush calls ForceFlush(context.Context) error +// on all Readers that implements this method. +// // This method is safe to call concurrently. func (mp *MeterProvider) ForceFlush(ctx context.Context) error { if mp.forceFlush != nil { diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index da68ef336..44dee654b 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -84,16 +84,6 @@ type Reader interface { // passed context is expected to be honored. Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error - // ForceFlush flushes all metric measurements held in an export pipeline. - // - // This deadline or cancellation of the passed context are honored. An appropriate - // error will be returned in these situations. There is no guaranteed that all - // telemetry be flushed or all resources have been released in these - // situations. - // - // This method needs to be concurrent safe. - ForceFlush(context.Context) error - // Shutdown flushes all metric measurements held in an export pipeline and releases any // held computational resources. // diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 8904d68ee..f2a20c0da 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -93,14 +93,6 @@ func (ts *readerTestSuite) TestShutdownTwice() { ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } -func (ts *readerTestSuite) TestMultipleForceFlush() { - ctx := context.Background() - ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) - ts.Require().NoError(ts.Reader.ForceFlush(ctx)) - ts.NoError(ts.Reader.ForceFlush(ctx)) -} - func (ts *readerTestSuite) TestMultipleRegister() { p0 := testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { @@ -186,11 +178,13 @@ func (ts *readerTestSuite) TestMethodConcurrentSafe() { _ = ts.Reader.Collect(ctx, nil) }() - wg.Add(1) - go func() { - defer wg.Done() - _ = ts.Reader.ForceFlush(ctx) - }() + if f, ok := ts.Reader.(interface{ ForceFlush(context.Context) error }); ok { + wg.Add(1) + go func() { + defer wg.Done() + _ = f.ForceFlush(ctx) + }() + } wg.Add(1) go func() {