1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-18 03:22:12 +02:00
opentelemetry-go/sdk/log/exporter.go
Tyler Yahn 4af9c20a80
Implement the BatchingProcessor (#5093)
* [WIP] Implement the BatchingProcessor

* Add TestExportSync

* Add TestChunker

* Test export error default to ErrorHandler

* Fix lint

* Fix chunk smaller than size error

* Add batch tests

* Fix lint

* Update OnEmit test

Check the len of records in eventually assertion given that is what we
are going to measure.

* Revert unneeded change to BatchingProcessor doc

* Add batch type

* Refactor testing of batching config

The BatchingProcessor is not expected to ultimately contain
configuration fields for queue size or export parameters (see #5093).
This will break TestNewBatchingProcessorConfiguration which tests the
configuration by evaluating the BatchingProcessor directly.

Instead, test the batchingConfig and rename the test to
TestNewBatchingConfig to match what is being tested.

* Implement the BatchingProcessor without polling

* Add TestBatchingProcessor

* Add ConcurrentSafe test

* Expand Shutdown tests

* Test context canceled for ForceFlush

* Refactor batch to queue

* Use exportSync

* Update docs and naming

* Split buffered export to its own type

* Update comments and naming

* Fix lint

* Remove redundant triggered type

* Add interval polling

* Refactor test structure

* Add custom ring implimementation

* Add BenchmarkBatchingProcessor

* Fix merge

* Remove custom ring impl

* Remove BenchmarkBatchingProcessor

* Update dev docs

* Test nil exporter

* Update OnEmit test

Ensure the poll goroutine will completely flush the queue of batches.

* Test RetriggerFlushNonBlocking

* Update ascii diagram

* Fix flaky OnEmit

* Revert unnecessary change to test pkg name

* Use batching term in docs

* Document EnqueueExport

* Return from EnqueueExport if blocked

Do not wait for the enqueue to succeed.

* Do not drop failed flush log records

* Use cancelable ctx in concurrency test

* Fix comments

* Apply feedback

Do not spawn a goroutine for the flush operation.

* Return true from EnqueueExport when stopped

* Update sdk/log/batch.go

Co-authored-by: Robert Pająk <pellared@hotmail.com>

* Remove TODO

* Comment re-trigger in poll

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
2024-04-18 07:48:19 -07:00

317 lines
8.7 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
)
// Exporter handles the delivery of log records to external receivers.
//
// Any of the Exporter's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Exporter to manage
// this concurrency.
type Exporter interface {
// Export transmits log records to a receiver.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// All retry logic must be contained in this function. The SDK does not
// implement any retry logic. All errors returned by this function are
// considered unrecoverable and will be reported to a configured error
// Handler.
//
// Implementations must not retain the records slice.
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
Export(ctx context.Context, records []Record) error
// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
Shutdown(ctx context.Context) error
// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
ForceFlush(ctx context.Context) error
}
var defaultNoopExporter = &noopExporter{}
type noopExporter struct{}
func (noopExporter) Export(context.Context, []Record) error { return nil }
func (noopExporter) Shutdown(context.Context) error { return nil }
func (noopExporter) ForceFlush(context.Context) error { return nil }
// chunkExporter wraps an Exporter's Export method so it is called with
// appropriately sized export payloads. Any payload larger than a defined size
// is chunked into smaller payloads and exported sequentially.
type chunkExporter struct {
Exporter
// size is the maximum batch size exported.
size int
}
// newChunkExporter wraps exporter. Calls to the Export will have their records
// payload chuncked so they do not exceed size. If size is less than or equal
// to 0, exporter is returned directly.
func newChunkExporter(exporter Exporter, size int) Exporter {
if size <= 0 {
return exporter
}
return &chunkExporter{Exporter: exporter, size: size}
}
// Export exports records in chuncks no larger than c.size.
func (c chunkExporter) Export(ctx context.Context, records []Record) error {
n := len(records)
for i, j := 0, min(c.size, n); i < n; i, j = i+c.size, min(j+c.size, n) {
if err := c.Exporter.Export(ctx, records[i:j]); err != nil {
return err
}
}
return nil
}
// timeoutExporter wraps an Exporter and ensures any call to Export will have a
// timeout for the context.
type timeoutExporter struct {
Exporter
// timeout is the maximum time an export is attempted.
timeout time.Duration
}
// newTimeoutExporter wraps exporter with an Exporter that limits the context
// lifetime passed to Export to be timeout. If timeout is less than or equal to
// zero, exporter will be returned directly.
func newTimeoutExporter(exp Exporter, timeout time.Duration) Exporter {
if timeout <= 0 {
return exp
}
return &timeoutExporter{Exporter: exp, timeout: timeout}
}
// Export sets the timeout of ctx before calling the Exporter e wraps.
func (e *timeoutExporter) Export(ctx context.Context, records []Record) error {
ctx, cancel := context.WithTimeout(ctx, e.timeout)
defer cancel()
return e.Exporter.Export(ctx, records)
}
// exportSync exports all data from input using exporter in a spawned
// goroutine. The returned chan will be closed when the spawned goroutine
// completes.
func exportSync(input <-chan exportData, exporter Exporter) (done chan struct{}) {
done = make(chan struct{})
go func() {
defer close(done)
for data := range input {
data.DoExport(exporter.Export)
}
}()
return done
}
// exportData is data related to an export.
type exportData struct {
ctx context.Context
records []Record
// respCh is the channel any error returned from the export will be sent
// on. If this is nil, and the export error is non-nil, the error will
// passed to the OTel error handler.
respCh chan<- error
}
// DoExport calls exportFn with the data contained in e. The error response
// will be returned on e's respCh if not nil. The error will be handled by the
// default OTel error handle if it is not nil and respCh is nil or full.
func (e exportData) DoExport(exportFn func(context.Context, []Record) error) {
if len(e.records) == 0 {
e.respond(nil)
return
}
e.respond(exportFn(e.ctx, e.records))
}
func (e exportData) respond(err error) {
select {
case e.respCh <- err:
default:
// e.respCh is nil or busy, default to otel.Handler.
if err != nil {
otel.Handle(err)
}
}
}
// bufferExporter provides asynchronous and synchronous export functionality by
// buffering export requests.
type bufferExporter struct {
Exporter
input chan exportData
inputMu sync.Mutex
done chan struct{}
stopped atomic.Bool
}
// newBufferExporter returns a new bufferExporter that wraps exporter. The
// returned bufferExporter will buffer at most size number of export requests.
// If size is less than zero, zero will be used (i.e. only synchronous
// exporting will be supported).
func newBufferExporter(exporter Exporter, size int) *bufferExporter {
if size < 0 {
size = 0
}
input := make(chan exportData, size)
return &bufferExporter{
Exporter: exporter,
input: input,
done: exportSync(input, exporter),
}
}
var errStopped = errors.New("exporter stopped")
func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error {
data := exportData{ctx, records, rCh}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return errStopped
}
select {
case e.input <- data:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// EnqueueExport enqueues an export of records in the context of ctx to be
// performed asynchronously. This will return true if the records are
// successfully enqueued (or the bufferExporter is shut down), false otherwise.
//
// The passed records are held after this call returns.
func (e *bufferExporter) EnqueueExport(records []Record) bool {
if len(records) == 0 {
// Nothing to enqueue, do not waste input space.
return true
}
data := exportData{ctx: context.Background(), records: records}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return true
}
select {
case e.input <- data:
return true
default:
return false
}
}
// Export synchronously exports records in the context of ctx. This will not
// return until the export has been completed.
func (e *bufferExporter) Export(ctx context.Context, records []Record) error {
if len(records) == 0 {
return nil
}
resp := make(chan error, 1)
err := e.enqueue(ctx, records, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return fmt.Errorf("%w: dropping %d records", err, len(records))
}
select {
case err := <-resp:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// ForceFlush flushes buffered exports. Any existing exports that is buffered
// is flushed before this returns.
func (e *bufferExporter) ForceFlush(ctx context.Context) error {
resp := make(chan error, 1)
err := e.enqueue(ctx, nil, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return err
}
select {
case <-resp:
case <-ctx.Done():
return ctx.Err()
}
return e.Exporter.ForceFlush(ctx)
}
// Shutdown shuts down e.
//
// Any buffered exports are flushed before this returns.
//
// All calls to EnqueueExport or Exporter will return nil without any export
// after this is called.
func (e *bufferExporter) Shutdown(ctx context.Context) error {
if e.stopped.Swap(true) {
return nil
}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// No more sends will be made.
close(e.input)
select {
case <-e.done:
case <-ctx.Done():
return errors.Join(ctx.Err(), e.Exporter.Shutdown(ctx))
}
return e.Exporter.Shutdown(ctx)
}