// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" "errors" "slices" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel/internal/global" ) const ( dfltMaxQSize = 2048 dfltExpInterval = time.Second dfltExpTimeout = 30 * time.Second dfltExpMaxBatchSize = 512 envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE" envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY" envarExpTimeout = "OTEL_BLRP_EXPORT_TIMEOUT" envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" ) // Compile-time check BatchProcessor implements Processor. var _ Processor = (*BatchProcessor)(nil) // BatchProcessor is a processor that exports batches of log records. // // Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor // is shut down by default, no records will be batched or exported. type BatchProcessor struct { // The BatchProcessor is designed to provide the highest throughput of // log records possible while being compatible with OpenTelemetry. The // entry point of log records is the OnEmit method. This method is designed // to receive records as fast as possible while still honoring shutdown // commands. All records received are enqueued to queue. // // In order to block OnEmit as little as possible, a separate "poll" // goroutine is spawned at the creation of a BatchProcessor. This // goroutine is responsible for batching the queue at regular polled // intervals, or when it is directly signaled to. // // To keep the polling goroutine from backing up, all batches it makes are // exported with a bufferedExporter. This exporter allows the poll // goroutine to enqueue an export payload that will be handled in a // separate goroutine dedicated to the export. This asynchronous behavior // allows the poll goroutine to maintain accurate interval polling. // // ___BatchProcessor____ __Poll Goroutine__ __Export Goroutine__ // || || || || || || // || ********** || || || || ********** || // || Records=>* OnEmit * || || | - ticker || || * export * || // || ********** || || | - trigger || || ********** || // || || || || | || || || || // || || || || | || || || || // || __________\/___ || || |*********** || || ______/\_______ || // || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] || // || || || |*********** || || || // ||_____________________|| ||__________________|| ||____________________|| // // // The "release valve" in this processing is the record queue. This queue // is a ring buffer. It will overwrite the oldest records first when writes // to OnEmit are made faster than the queue can be flushed. If batches // cannot be flushed to the export buffer, the records will remain in the // queue. // exporter is the bufferedExporter all batches are exported with. exporter *bufferExporter // q is the active queue of records that have not yet been exported. q *queue // batchSize is the minimum number of records needed before an export is // triggered (unless the interval expires). batchSize int // pollTrigger triggers the poll goroutine to flush a batch from the queue. // This is sent to when it is known that the queue contains at least one // complete batch. // // When a send is made to the channel, the poll loop will be reset after // the flush. If there is still enough records in the queue for another // batch the reset of the poll loop will automatically re-trigger itself. // There is no need for the original sender to monitor and resend. pollTrigger chan struct{} // pollKill kills the poll goroutine. This is only expected to be closed // once by the Shutdown method. pollKill chan struct{} // pollDone signals the poll goroutine has completed. pollDone chan struct{} // stopped holds the stopped state of the BatchProcessor. stopped atomic.Bool } // NewBatchProcessor decorates the provided exporter // so that the log records are batched before exporting. // // All of the exporter's methods are called synchronously. func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchProcessor { cfg := newBatchConfig(opts) if exporter == nil { // Do not panic on nil export. exporter = defaultNoopExporter } // Order is important here. Wrap the timeoutExporter with the chunkExporter // to ensure each export completes in timeout (instead of all chunked // exports). exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value) // Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched // appropriately on export. exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) b := &BatchProcessor{ // TODO: explore making the size of this configurable. exporter: newBufferExporter(exporter, 1), q: newQueue(cfg.maxQSize.Value), batchSize: cfg.expMaxBatchSize.Value, pollTrigger: make(chan struct{}, 1), pollKill: make(chan struct{}), } b.pollDone = b.poll(cfg.expInterval.Value) return b } // poll spawns a goroutine to handle interval polling and batch exporting. The // returned done chan is closed when the spawned goroutine completes. func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { done = make(chan struct{}) ticker := time.NewTicker(interval) // TODO: investigate using a sync.Pool instead of cloning. buf := make([]Record, b.batchSize) go func() { defer close(done) defer ticker.Stop() for { select { case <-ticker.C: case <-b.pollTrigger: ticker.Reset(interval) case <-b.pollKill: return } if d := b.q.Dropped(); d > 0 { global.Warn("dropped log records", "dropped", d) } qLen := b.q.TryDequeue(buf, func(r []Record) bool { ok := b.exporter.EnqueueExport(r) if ok { buf = slices.Clone(buf) } return ok }) if qLen >= b.batchSize { // There is another full batch ready. Immediately trigger // another export attempt. select { case b.pollTrigger <- struct{}{}: default: // Another flush signal already received. } } } }() return done } // OnEmit batches provided log record. func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { if b.stopped.Load() || b.q == nil { return nil } if n := b.q.Enqueue(r); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: // Flush chan full. The poll goroutine will handle this by // re-sending any trigger until the queue has less than batchSize // records. } } return nil } // Enabled returns if b is enabled. func (b *BatchProcessor) Enabled(context.Context, Record) bool { return !b.stopped.Load() && b.q != nil } // Shutdown flushes queued log records and shuts down the decorated exporter. func (b *BatchProcessor) Shutdown(ctx context.Context) error { if b.stopped.Swap(true) || b.q == nil { return nil } // Stop the poll goroutine. close(b.pollKill) select { case <-b.pollDone: case <-ctx.Done(): // Out of time. return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) } // Flush remaining queued before exporter shutdown. err := b.exporter.Export(ctx, b.q.Flush()) return errors.Join(err, b.exporter.Shutdown(ctx)) } var errPartialFlush = errors.New("partial flush: export buffer full") // Used for testing. var ctxErr = func(ctx context.Context) error { return ctx.Err() } // ForceFlush flushes queued log records and flushes the decorated exporter. func (b *BatchProcessor) ForceFlush(ctx context.Context) error { if b.stopped.Load() || b.q == nil { return nil } buf := make([]Record, b.q.cap) notFlushed := func() bool { var flushed bool _ = b.q.TryDequeue(buf, func(r []Record) bool { flushed = b.exporter.EnqueueExport(r) return flushed }) return !flushed } var err error // For as long as ctx allows, try to make a single flush of the queue. for notFlushed() { // Use ctxErr instead of calling ctx.Err directly so we can test // the partial error return. if e := ctxErr(ctx); e != nil { err = errors.Join(e, errPartialFlush) break } } return errors.Join(err, b.exporter.ForceFlush(ctx)) } // queue holds a queue of logging records. // // When the queue becomes full, the oldest records in the queue are // overwritten. type queue struct { sync.Mutex dropped atomic.Uint64 cap, len int read, write *ring } func newQueue(size int) *queue { r := newRing(size) return &queue{ cap: size, read: r, write: r, } } // Dropped returns the number of Records dropped during enqueueing since the // last time Dropped was called. func (q *queue) Dropped() uint64 { return q.dropped.Swap(0) } // Enqueue adds r to the queue. The queue size, including the addition of r, is // returned. // // If enqueueing r will exceed the capacity of q, the oldest Record held in q // will be dropped and r retained. func (q *queue) Enqueue(r Record) int { q.Lock() defer q.Unlock() q.write.Value = r q.write = q.write.Next() q.len++ if q.len > q.cap { // Overflow. Advance read to be the new "oldest". q.len = q.cap q.read = q.read.Next() q.dropped.Add(1) } return q.len } // TryDequeue attempts to dequeue up to len(buf) Records. The available Records // will be assigned into buf and passed to write. If write fails, returning // false, the Records will not be removed from the queue. If write succeeds, // returning true, the dequeued Records are removed from the queue. The number // of Records remaining in the queue are returned. // // When write is called the lock of q is held. The write function must not call // other methods of this q that acquire the lock. func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { q.Lock() defer q.Unlock() origRead := q.read n := min(len(buf), q.len) for i := 0; i < n; i++ { buf[i] = q.read.Value q.read = q.read.Next() } if write(buf[:n]) { q.len -= n } else { q.read = origRead } return q.len } // Flush returns all the Records held in the queue and resets it to be // empty. func (q *queue) Flush() []Record { q.Lock() defer q.Unlock() out := make([]Record, q.len) for i := range out { out[i] = q.read.Value q.read = q.read.Next() } q.len = 0 return out } type batchConfig struct { maxQSize setting[int] expInterval setting[time.Duration] expTimeout setting[time.Duration] expMaxBatchSize setting[int] } func newBatchConfig(options []BatchProcessorOption) batchConfig { var c batchConfig for _, o := range options { c = o.apply(c) } c.maxQSize = c.maxQSize.Resolve( clearLessThanOne[int](), getenv[int](envarMaxQSize), clearLessThanOne[int](), fallback[int](dfltMaxQSize), ) c.expInterval = c.expInterval.Resolve( clearLessThanOne[time.Duration](), getenv[time.Duration](envarExpInterval), clearLessThanOne[time.Duration](), fallback[time.Duration](dfltExpInterval), ) c.expTimeout = c.expTimeout.Resolve( clearLessThanOne[time.Duration](), getenv[time.Duration](envarExpTimeout), clearLessThanOne[time.Duration](), fallback[time.Duration](dfltExpTimeout), ) c.expMaxBatchSize = c.expMaxBatchSize.Resolve( clearLessThanOne[int](), getenv[int](envarExpMaxBatchSize), clearLessThanOne[int](), clampMax[int](c.maxQSize.Value), fallback[int](dfltExpMaxBatchSize), ) return c } // BatchProcessorOption applies a configuration to a [BatchProcessor]. type BatchProcessorOption interface { apply(batchConfig) batchConfig } type batchOptionFunc func(batchConfig) batchConfig func (fn batchOptionFunc) apply(c batchConfig) batchConfig { return fn(c) } // WithMaxQueueSize sets the maximum queue size used by the Batcher. // After the size is reached log records are dropped. // // If the OTEL_BLRP_MAX_QUEUE_SIZE environment variable is set, // and this option is not passed, that variable value will be used. // // By default, if an environment variable is not set, and this option is not // passed, 2048 will be used. // The default value is also used when the provided value is less than one. func WithMaxQueueSize(size int) BatchProcessorOption { return batchOptionFunc(func(cfg batchConfig) batchConfig { cfg.maxQSize = newSetting(size) return cfg }) } // WithExportInterval sets the maximum duration between batched exports. // // If the OTEL_BLRP_SCHEDULE_DELAY environment variable is set, // and this option is not passed, that variable value will be used. // // By default, if an environment variable is not set, and this option is not // passed, 1s will be used. // The default value is also used when the provided value is less than one. func WithExportInterval(d time.Duration) BatchProcessorOption { return batchOptionFunc(func(cfg batchConfig) batchConfig { cfg.expInterval = newSetting(d) return cfg }) } // WithExportTimeout sets the duration after which a batched export is canceled. // // If the OTEL_BLRP_EXPORT_TIMEOUT environment variable is set, // and this option is not passed, that variable value will be used. // // By default, if an environment variable is not set, and this option is not // passed, 30s will be used. // The default value is also used when the provided value is less than one. func WithExportTimeout(d time.Duration) BatchProcessorOption { return batchOptionFunc(func(cfg batchConfig) batchConfig { cfg.expTimeout = newSetting(d) return cfg }) } // WithExportMaxBatchSize sets the maximum batch size of every export. // A batch will be split into multiple exports to not exceed this size. // // If the OTEL_BLRP_MAX_EXPORT_BATCH_SIZE environment variable is set, // and this option is not passed, that variable value will be used. // // By default, if an environment variable is not set, and this option is not // passed, 512 will be used. // The default value is also used when the provided value is less than one. func WithExportMaxBatchSize(size int) BatchProcessorOption { return batchOptionFunc(func(cfg batchConfig) batchConfig { cfg.expMaxBatchSize = newSetting(size) return cfg }) }