1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Remove the Flush method from Exemplar (#4873)

Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
Tyler Yahn
2024-01-31 14:07:40 -08:00
committed by GitHub
parent fecb92e366
commit 242d23a181
6 changed files with 6 additions and 95 deletions
-5
View File
@@ -34,8 +34,3 @@ func (r *dropRes[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue)
func (r *dropRes[N]) Collect(dest *[]metricdata.Exemplar[N]) { func (r *dropRes[N]) Collect(dest *[]metricdata.Exemplar[N]) {
*dest = (*dest)[:0] *dest = (*dest)[:0]
} }
// Flush resets dest. No exemplars will ever be returned.
func (r *dropRes[N]) Flush(dest *[]metricdata.Exemplar[N]) {
*dest = (*dest)[:0]
}
@@ -44,9 +44,6 @@ func testSampledFiltered[N int64 | float64](t *testing.T) {
r.Collect(nil) r.Collect(nil)
assert.True(t, under.CollectCalled, "underlying Reservoir Collect not called") assert.True(t, under.CollectCalled, "underlying Reservoir Collect not called")
r.Flush(nil)
assert.True(t, under.FlushCalled, "underlying Reservoir Flush not called")
} }
func sample(parent context.Context) context.Context { func sample(parent context.Context) context.Context {
@@ -61,7 +58,6 @@ func sample(parent context.Context) context.Context {
type res[N int64 | float64] struct { type res[N int64 | float64] struct {
OfferCalled bool OfferCalled bool
CollectCalled bool CollectCalled bool
FlushCalled bool
} }
func (r *res[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) { func (r *res[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) {
@@ -71,7 +67,3 @@ func (r *res[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) {
func (r *res[N]) Collect(*[]metricdata.Exemplar[N]) { func (r *res[N]) Collect(*[]metricdata.Exemplar[N]) {
r.CollectCalled = true r.CollectCalled = true
} }
func (r *res[N]) Flush(*[]metricdata.Exemplar[N]) {
r.FlushCalled = true
}
-5
View File
@@ -193,8 +193,3 @@ func (r *randRes[N]) Collect(dest *[]metricdata.Exemplar[N]) {
// measurements that are made over the older collection cycle ones. // measurements that are made over the older collection cycle ones.
r.reset() r.reset()
} }
func (r *randRes[N]) Flush(dest *[]metricdata.Exemplar[N]) {
r.storage.Flush(dest)
r.reset()
}
+1 -8
View File
@@ -39,13 +39,6 @@ type Reservoir[N int64 | float64] interface {
// Collect returns all the held exemplars. // Collect returns all the held exemplars.
// //
// The Reservoir state is preserved after this call. See Flush to // The Reservoir state is preserved after this call.
// copy-and-clear instead.
Collect(dest *[]metricdata.Exemplar[N]) Collect(dest *[]metricdata.Exemplar[N])
// Flush returns all the held exemplars.
//
// The Reservoir state is reset after this call. See Collect to preserve
// the state instead.
Flush(dest *[]metricdata.Exemplar[N])
} }
+4 -46
View File
@@ -92,25 +92,6 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
assert.Equal(t, want, dest[0]) assert.Equal(t, want, dest[0])
}) })
t.Run("CollectDoesNotFlush", func(t *testing.T) {
t.Helper()
r, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r.Offer(ctx, staticTime, 10, nil)
var dest []metricdata.Exemplar[N]
r.Collect(&dest)
require.Len(t, dest, 1, "number of collected exemplars")
dest = dest[:0]
r.Collect(&dest)
assert.Len(t, dest, 1, "Collect flushed reservoir")
})
t.Run("CollectLessThanN", func(t *testing.T) { t.Run("CollectLessThanN", func(t *testing.T) {
t.Helper() t.Helper()
@@ -127,24 +108,6 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
require.Len(t, dest, 1, "number of collected exemplars") require.Len(t, dest, 1, "number of collected exemplars")
}) })
t.Run("FlushFlushes", func(t *testing.T) {
t.Helper()
r, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r.Offer(ctx, staticTime, 10, nil)
var dest []metricdata.Exemplar[N]
r.Flush(&dest)
require.Len(t, dest, 1, "number of flushed exemplars")
r.Flush(&dest)
assert.Len(t, dest, 0, "Flush did not flush reservoir")
})
t.Run("MultipleOffers", func(t *testing.T) { t.Run("MultipleOffers", func(t *testing.T) {
t.Helper() t.Helper()
@@ -159,17 +122,17 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
} }
var dest []metricdata.Exemplar[N] var dest []metricdata.Exemplar[N]
r.Flush(&dest) r.Collect(&dest)
assert.Len(t, dest, n, "multiple offers did not fill reservoir") assert.Len(t, dest, n, "multiple offers did not fill reservoir")
// Ensure the flush reset also resets any couting state. // Ensure the collect reset also resets any couting state.
for i := 0; i < n+1; i++ { for i := 0; i < n+1; i++ {
v := N(2 * i) v := N(i)
r.Offer(ctx, staticTime, v, nil) r.Offer(ctx, staticTime, v, nil)
} }
dest = dest[:0] dest = dest[:0]
r.Flush(&dest) r.Collect(&dest)
assert.Len(t, dest, n, "internal count state not reset") assert.Len(t, dest, n, "internal count state not reset")
}) })
@@ -186,11 +149,6 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
dest := []metricdata.Exemplar[N]{{}} // Should be reset to empty. dest := []metricdata.Exemplar[N]{{}} // Should be reset to empty.
r.Collect(&dest) r.Collect(&dest)
assert.Len(t, dest, 0, "no exemplars should be collected") assert.Len(t, dest, 0, "no exemplars should be collected")
r.Offer(context.Background(), staticTime, 10, nil)
dest = []metricdata.Exemplar[N]{{}} // Should be reset to empty.
r.Flush(&dest)
assert.Len(t, dest, 0, "no exemplars should be flushed")
}) })
} }
} }
+1 -23
View File
@@ -38,8 +38,7 @@ func newStorage[N int64 | float64](n int) *storage[N] {
// Collect returns all the held exemplars. // Collect returns all the held exemplars.
// //
// The Reservoir state is preserved after this call. See Flush to // The Reservoir state is preserved after this call.
// copy-and-clear instead.
func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) { func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) {
*dest = reset(*dest, len(r.store), len(r.store)) *dest = reset(*dest, len(r.store), len(r.store))
var n int var n int
@@ -54,27 +53,6 @@ func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) {
*dest = (*dest)[:n] *dest = (*dest)[:n]
} }
// Flush returns all the held exemplars.
//
// The Reservoir state is reset after this call. See Collect to preserve the
// state instead.
func (r *storage[N]) Flush(dest *[]metricdata.Exemplar[N]) {
*dest = reset(*dest, len(r.store), len(r.store))
var n int
for i, m := range r.store {
if !m.valid {
continue
}
m.Exemplar(&(*dest)[n])
n++
// Reset.
r.store[i] = measurement[N]{}
}
*dest = (*dest)[:n]
}
// measurement is a measurement made by a telemetry system. // measurement is a measurement made by a telemetry system.
type measurement[N int64 | float64] struct { type measurement[N int64 | float64] struct {
// FilteredAttributes are the attributes dropped during the measurement. // FilteredAttributes are the attributes dropped during the measurement.