1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-11-24 08:22:25 +02:00

Add queue for BatchingProcessor (#5131)

This commit is contained in:
Tyler Yahn 2024-04-03 04:53:16 -07:00 committed by GitHub
parent 5449f083aa
commit 6c6e1e7416
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 200 additions and 0 deletions

View File

@ -4,7 +4,9 @@
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"container/ring"
"context"
"sync"
"time"
)
@ -76,6 +78,91 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
return nil
}
// queue holds a queue of logging records.
//
// When the queue becomes full, the oldest records in the queue are
// overwritten.
type queue struct {
sync.Mutex
cap, len int
read, write *ring.Ring
}
func newQueue(size int) *queue {
r := ring.New(size)
return &queue{
cap: size,
read: r,
write: r,
}
}
// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
//
// If enqueueing r will exceed the capacity of q, the oldest Record held in q
// will be dropped and r retained.
func (q *queue) Enqueue(r Record) int {
q.Lock()
defer q.Unlock()
q.write.Value = r
q.write = q.write.Next()
q.len++
if q.len > q.cap {
// Overflow. Advance read to be the new "oldest".
q.len = q.cap
q.read = q.read.Next()
}
return q.len
}
// TryDequeue attempts to dequeue up to len(buf) Records. The available Records
// will be assigned into buf and passed to write. If write fails, returning
// false, the Records will not be removed from the queue. If write succeeds,
// returning true, the dequeued Records are removed from the queue. The number
// of Records remaining in the queue are returned.
//
// When write is called the lock of q is held. The write function must not call
// other methods of this q that acquire the lock.
func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {
q.Lock()
defer q.Unlock()
origRead := q.read
n := min(len(buf), q.len)
for i := 0; i < n; i++ {
buf[i] = q.read.Value.(Record)
q.read = q.read.Next()
}
if write(buf[:n]) {
q.len -= n
} else {
q.read = origRead
}
return q.len
}
// Flush returns all the Records held in the queue and resets it to be
// empty.
func (q *queue) Flush() []Record {
q.Lock()
defer q.Unlock()
out := make([]Record, q.len)
for i := range out {
out[i] = q.read.Value.(Record)
q.read = q.read.Next()
}
q.len = 0
return out
}
type batchingConfig struct {
maxQSize setting[int]
expInterval setting[time.Duration]

View File

@ -4,13 +4,17 @@
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"slices"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/log"
)
func TestNewBatchingConfig(t *testing.T) {
@ -126,3 +130,112 @@ func TestNewBatchingConfig(t *testing.T) {
})
}
}
func TestQueue(t *testing.T) {
var r Record
r.SetBody(log.BoolValue(true))
t.Run("newQueue", func(t *testing.T) {
const size = 1
q := newQueue(size)
assert.Equal(t, q.len, 0)
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, size, q.read.Len(), "read ring")
assert.Same(t, q.read, q.write, "different rings")
})
t.Run("Enqueue", func(t *testing.T) {
const size = 2
q := newQueue(size)
var notR Record
notR.SetBody(log.IntValue(10))
assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch")
assert.Equal(t, 1, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, 2, q.Enqueue(r), "complete batch")
assert.Equal(t, 2, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, 2, q.Enqueue(r), "overflow batch")
assert.Equal(t, 2, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records")
})
t.Run("Flush", func(t *testing.T) {
const size = 2
q := newQueue(size)
q.write.Value = r
q.write = q.write.Next()
q.len = 1
assert.Equal(t, []Record{r}, q.Flush(), "flushed")
})
t.Run("TryFlush", func(t *testing.T) {
const size = 3
q := newQueue(size)
for i := 0; i < size-1; i++ {
q.write.Value = r
q.write = q.write.Next()
q.len++
}
buf := make([]Record, 1)
f := func([]Record) bool { return false }
assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed")
require.Equal(t, size-1, q.len, "length")
require.NotSame(t, q.read, q.write, "read ring advanced")
var flushed []Record
f = func(r []Record) bool {
flushed = append(flushed, r...)
return true
}
if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") {
assert.Equal(t, []Record{r}, flushed, "Records")
}
buf = slices.Grow(buf, size)
flushed = flushed[:0]
if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") {
assert.Equal(t, []Record{r}, flushed, "Records")
}
})
t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10
flushed := make(chan []Record, goRoutines)
out := make([]Record, 0, goRoutines)
done := make(chan struct{})
go func() {
defer close(done)
for recs := range flushed {
out = append(out, recs...)
}
}()
var wg sync.WaitGroup
wg.Add(goRoutines)
b := newQueue(goRoutines)
for i := 0; i < goRoutines; i++ {
go func() {
defer wg.Done()
b.Enqueue(Record{})
flushed <- b.Flush()
}()
}
wg.Wait()
close(flushed)
<-done
assert.Len(t, out, goRoutines, "flushed Records")
})
}