1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Implement otlploggrpc exporter (#5582)

Part of #5056

It also abstracts some test help functions from the client and adjusts
the indent of `UploadLogs` and `PartialSuccess` in client tests.

For full usage of this exporter, check
https://github.com/open-telemetry/opentelemetry-go/pull/5522

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Sam Xie
2024-07-09 12:37:01 -07:00
committed by GitHub
parent fa00fc5751
commit 4816927e03
4 changed files with 293 additions and 95 deletions
@@ -204,6 +204,16 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
return ctx, cancel
}
type noopClient struct{}
func newNoopClient() *noopClient {
return &noopClient{}
}
func (c *noopClient) UploadLogs(context.Context, []*logpb.ResourceLogs) error { return nil }
func (c *noopClient) Shutdown(context.Context) error { return nil }
// retryable returns if err identifies a request that can be retried and a
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
@@ -450,113 +450,114 @@ func (c *grpcCollector) Collect() *storage {
return c.storage
}
func TestClient(t *testing.T) {
factory := func(rCh <-chan exportResult) (*client, *grpcCollector) {
coll, err := newGRPCCollector("", rCh)
require.NoError(t, err)
func clientFactory(t *testing.T, rCh <-chan exportResult) (*client, *grpcCollector) {
t.Helper()
coll, err := newGRPCCollector("", rCh)
require.NoError(t, err)
addr := coll.listener.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := newConfig(opts)
client, err := newClient(cfg)
require.NoError(t, err)
return client, coll
addr := coll.listener.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := newConfig(opts)
client, err := newClient(cfg)
require.NoError(t, err)
return client, coll
}
func testCtxErrs(factory func() func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()
f := factory()
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})
t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()
f := factory()
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})
}
}
func TestClient(t *testing.T) {
t.Run("ClientHonorsContextErrors", func(t *testing.T) {
testCtxErrs := func(factory func() func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()
f := factory()
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})
t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()
f := factory()
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})
}
}
t.Run("Shutdown", testCtxErrs(func() func(context.Context) error {
c, _ := factory(nil)
c, _ := clientFactory(t, nil)
return c.Shutdown
}))
t.Run("UploadLog", testCtxErrs(func() func(context.Context) error {
c, _ := factory(nil)
c, _ := clientFactory(t, nil)
return func(ctx context.Context) error {
return c.UploadLogs(ctx, nil)
}
}))
})
t.Run("UploadLogs", func(t *testing.T) {
ctx := context.Background()
client, coll := factory(nil)
t.Run("UploadLogs", func(t *testing.T) {
ctx := context.Background()
client, coll := clientFactory(t, nil)
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceLogs")
diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal))
if diff != "" {
t.Fatalf("unexpected ResourceLogs:\n%s", diff)
}
})
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceLogs")
diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal))
if diff != "" {
t.Fatalf("unexpected ResourceLogs:\n%s", diff)
}
})
t.Run("PartialSuccess", func(t *testing.T) {
const n, msg = 2, "bad data"
rCh := make(chan exportResult, 3)
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
t.Run("PartialSuccess", func(t *testing.T) {
const n, msg = 2, "bad data"
rCh := make(chan exportResult, 3)
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
// Should not be logged.
RejectedLogRecords: 0,
ErrorMessage: "",
},
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
// Should not be logged.
RejectedLogRecords: 0,
ErrorMessage: "",
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{},
}
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{},
}
ctx := context.Background()
client, _ := factory(rCh)
ctx := context.Background()
client, _ := clientFactory(t, rCh)
defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())
defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())
var errs []error
eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
otel.SetErrorHandler(eh)
var errs []error
eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
otel.SetErrorHandler(eh)
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.Equal(t, 1, len(errs))
want := fmt.Sprintf("%s (%d log records rejected)", msg, n)
assert.ErrorContains(t, errs[0], want)
})
require.Equal(t, 1, len(errs))
want := fmt.Sprintf("%s (%d log records rejected)", msg, n)
assert.ErrorContains(t, errs[0], want)
})
}
+46 -10
View File
@@ -5,14 +5,27 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o
import (
"context"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform"
"go.opentelemetry.io/otel/sdk/log"
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
)
type logClient interface {
UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error
Shutdown(context.Context) error
}
// Exporter is a OpenTelemetry log Exporter. It transports log data encoded as
// OTLP protobufs using gRPC.
type Exporter struct {
// TODO: implement.
// Ensure synchronous access to the client across all functionality.
clientMu sync.Mutex
client logClient
stopped atomic.Bool
}
// Compile-time check Exporter implements [log.Exporter].
@@ -25,29 +38,52 @@ func New(_ context.Context, options ...Option) (*Exporter, error) {
if err != nil {
return nil, err
}
return newExporter(c, cfg)
return newExporter(c), nil
}
func newExporter(*client, config) (*Exporter, error) {
// TODO: implement
return &Exporter{}, nil
func newExporter(c logClient) *Exporter {
var e Exporter
e.client = c
return &e
}
var transformResourceLogs = transform.ResourceLogs
// Export transforms and transmits log records to an OTLP receiver.
//
// This method returns nil and drops records if called after Shutdown.
// This method returns an error if the method is canceled by the passed context.
func (e *Exporter) Export(ctx context.Context, records []log.Record) error {
// TODO: implement.
return nil
if e.stopped.Load() {
return nil
}
otlp := transformResourceLogs(records)
if otlp == nil {
return nil
}
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.UploadLogs(ctx, otlp)
}
// Shutdown shuts down the Exporter. Calls to Export or ForceFlush will perform
// no operation after this is called.
func (e *Exporter) Shutdown(ctx context.Context) error {
// TODO: implement.
return nil
if e.stopped.Swap(true) {
return nil
}
e.clientMu.Lock()
defer e.clientMu.Unlock()
err := e.client.Shutdown(ctx)
e.client = newNoopClient()
return err
}
// ForceFlush does nothing. The Exporter holds no state.
func (e *Exporter) ForceFlush(ctx context.Context) error {
// TODO: implement.
return nil
}
@@ -0,0 +1,151 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otlploggrpc
import (
"context"
"errors"
"runtime"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/log"
sdklog "go.opentelemetry.io/otel/sdk/log"
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
)
var records []sdklog.Record
func init() {
var r sdklog.Record
r.SetTimestamp(ts)
r.SetBody(log.StringValue("A"))
records = append(records, r)
r.SetBody(log.StringValue("B"))
records = append(records, r)
}
type mockClient struct {
err error
uploads int
}
func (m *mockClient) UploadLogs(context.Context, []*logpb.ResourceLogs) error {
m.uploads++
return m.err
}
func (m *mockClient) Shutdown(context.Context) error {
return m.err
}
func TestExporterExport(t *testing.T) {
errClient := errors.New("client")
testCases := []struct {
name string
logs []sdklog.Record
err error
wantLogs []sdklog.Record
wantErr error
}{
{
name: "NoError",
logs: make([]sdklog.Record, 2),
wantLogs: make([]sdklog.Record, 2),
},
{
name: "Error",
logs: make([]sdklog.Record, 2),
err: errClient,
wantErr: errClient,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
orig := transformResourceLogs
var got []sdklog.Record
transformResourceLogs = func(r []sdklog.Record) []*logpb.ResourceLogs {
got = r
return make([]*logpb.ResourceLogs, len(r))
}
t.Cleanup(func() { transformResourceLogs = orig })
mockCli := mockClient{err: tc.err}
e := newExporter(&mockCli)
err := e.Export(context.Background(), tc.logs)
assert.Equal(t, tc.wantErr, err)
assert.Equal(t, tc.logs, got)
assert.Equal(t, 1, mockCli.uploads)
})
}
}
func TestExporterShutdown(t *testing.T) {
ctx := context.Background()
e, err := New(ctx)
require.NoError(t, err, "New")
assert.NoError(t, e.Shutdown(ctx), "Shutdown Exporter")
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
r := make([]sdklog.Record, 1)
assert.NoError(t, e.Export(ctx, r), "Export on Shutdown Exporter")
assert.NoError(t, e.ForceFlush(ctx), "ForceFlush on Shutdown Exporter")
assert.NoError(t, e.Shutdown(ctx), "Shutdown on Shutdown Exporter")
}
func TestExporterForceFlush(t *testing.T) {
ctx := context.Background()
e, err := New(ctx)
require.NoError(t, err, "New")
assert.NoError(t, e.ForceFlush(ctx), "ForceFlush")
}
func TestExporterConcurrentSafe(t *testing.T) {
e := newExporter(&mockClient{})
const goroutines = 10
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
runs := new(uint64)
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r := make([]sdklog.Record, 1)
for {
select {
case <-ctx.Done():
return
default:
_ = e.Export(ctx, r)
_ = e.ForceFlush(ctx)
atomic.AddUint64(runs, 1)
}
}
}()
}
for atomic.LoadUint64(runs) == 0 {
runtime.Gosched()
}
_ = e.Shutdown(ctx)
cancel()
wg.Wait()
}