You've already forked opentelemetry-go
							
							
				mirror of
				https://github.com/open-telemetry/opentelemetry-go.git
				synced 2025-10-31 00:07:40 +02:00 
			
		
		
		
	Implement the SimpleProcessor (#5079)
* Implement the SimpleProcessor * Add BenchmarkSimpleProcessorOnEmit * Remove stale comment * Run benchmarks in parallel
This commit is contained in:
		| @@ -40,3 +40,11 @@ type Exporter interface { | ||||
| 	// appropriate error should be returned in these situations. | ||||
| 	ForceFlush(ctx context.Context) error | ||||
| } | ||||
|  | ||||
| type noopExporter struct{} | ||||
|  | ||||
| func (noopExporter) Export(context.Context, []Record) error { return nil } | ||||
|  | ||||
| func (noopExporter) Shutdown(context.Context) error { return nil } | ||||
|  | ||||
| func (noopExporter) ForceFlush(context.Context) error { return nil } | ||||
|   | ||||
| @@ -5,34 +5,38 @@ package log // import "go.opentelemetry.io/otel/sdk/log" | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| ) | ||||
|  | ||||
| // Compile-time check SimpleProcessor implements Processor. | ||||
| var _ Processor = (*SimpleProcessor)(nil) | ||||
|  | ||||
| // SimpleProcessor is an processor that synchronously exports log records. | ||||
| type SimpleProcessor struct{} | ||||
| type SimpleProcessor struct { | ||||
| 	exporterMu sync.Mutex | ||||
| 	exporter   Exporter | ||||
| } | ||||
|  | ||||
| // NewSimpleProcessor is a simple Processor adapter. | ||||
| // | ||||
| // Any of the exporter's methods may be called concurrently with itself | ||||
| // or with other methods. It is the responsibility of the exporter to manage | ||||
| // this concurrency. | ||||
| // | ||||
| // This Processor is not recommended for production use. The synchronous | ||||
| // nature of this Processor make it good for testing, debugging, or | ||||
| // showing examples of other features, but it can be slow and have a high | ||||
| // computation resource usage overhead. [NewBatchingProcessor] is recommended | ||||
| // for production use instead. | ||||
| func NewSimpleProcessor(exporter Exporter) *SimpleProcessor { | ||||
| 	// TODO (#5062): Implement. | ||||
| 	return nil | ||||
| 	if exporter == nil { | ||||
| 		// Do not panic on nil exporter. | ||||
| 		exporter = noopExporter{} | ||||
| 	} | ||||
| 	return &SimpleProcessor{exporter: exporter} | ||||
| } | ||||
|  | ||||
| // OnEmit batches provided log record. | ||||
| func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error { | ||||
| 	// TODO (#5062): Implement. | ||||
| 	return nil | ||||
| 	s.exporterMu.Lock() | ||||
| 	defer s.exporterMu.Unlock() | ||||
| 	return s.exporter.Export(ctx, []Record{r}) | ||||
| } | ||||
|  | ||||
| // Enabled returns true. | ||||
| @@ -42,12 +46,14 @@ func (s *SimpleProcessor) Enabled(context.Context, Record) bool { | ||||
|  | ||||
| // Shutdown shuts down the expoter. | ||||
| func (s *SimpleProcessor) Shutdown(ctx context.Context) error { | ||||
| 	// TODO (#5062): Implement. | ||||
| 	return nil | ||||
| 	s.exporterMu.Lock() | ||||
| 	defer s.exporterMu.Unlock() | ||||
| 	return s.exporter.Shutdown(ctx) | ||||
| } | ||||
|  | ||||
| // ForceFlush flushes the exporter. | ||||
| func (s *SimpleProcessor) ForceFlush(ctx context.Context) error { | ||||
| 	// TODO (#5062): Implement. | ||||
| 	return nil | ||||
| 	s.exporterMu.Lock() | ||||
| 	defer s.exporterMu.Unlock() | ||||
| 	return s.exporter.ForceFlush(ctx) | ||||
| } | ||||
|   | ||||
							
								
								
									
										113
									
								
								sdk/log/simple_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										113
									
								
								sdk/log/simple_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,113 @@ | ||||
| // Copyright The OpenTelemetry Authors | ||||
| // SPDX-License-Identifier: Apache-2.0 | ||||
|  | ||||
| package log_test | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/require" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/sdk/log" | ||||
| ) | ||||
|  | ||||
| type exporter struct { | ||||
| 	records []log.Record | ||||
|  | ||||
| 	exportCalled     bool | ||||
| 	shutdownCalled   bool | ||||
| 	forceFlushCalled bool | ||||
| } | ||||
|  | ||||
| func (e *exporter) Export(_ context.Context, r []log.Record) error { | ||||
| 	e.records = r | ||||
| 	e.exportCalled = true | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *exporter) Shutdown(context.Context) error { | ||||
| 	e.shutdownCalled = true | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (e *exporter) ForceFlush(context.Context) error { | ||||
| 	e.forceFlushCalled = true | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func TestSimpleProcessorOnEmit(t *testing.T) { | ||||
| 	e := new(exporter) | ||||
| 	s := log.NewSimpleProcessor(e) | ||||
|  | ||||
| 	var r log.Record | ||||
| 	r.SetSeverityText("test") | ||||
| 	_ = s.OnEmit(context.Background(), r) | ||||
|  | ||||
| 	require.True(t, e.exportCalled, "exporter Export not called") | ||||
| 	assert.Equal(t, []log.Record{r}, e.records) | ||||
| } | ||||
|  | ||||
| func TestSimpleProcessorEnabled(t *testing.T) { | ||||
| 	s := log.NewSimpleProcessor(nil) | ||||
| 	assert.True(t, s.Enabled(context.Background(), log.Record{})) | ||||
| } | ||||
|  | ||||
| func TestSimpleProcessorShutdown(t *testing.T) { | ||||
| 	e := new(exporter) | ||||
| 	s := log.NewSimpleProcessor(e) | ||||
| 	_ = s.Shutdown(context.Background()) | ||||
| 	require.True(t, e.shutdownCalled, "exporter Shutdown not called") | ||||
| } | ||||
|  | ||||
| func TestSimpleProcessorForceFlush(t *testing.T) { | ||||
| 	e := new(exporter) | ||||
| 	s := log.NewSimpleProcessor(e) | ||||
| 	_ = s.ForceFlush(context.Background()) | ||||
| 	require.True(t, e.forceFlushCalled, "exporter ForceFlush not called") | ||||
| } | ||||
|  | ||||
| func TestSimpleProcessorConcurrentSafe(t *testing.T) { | ||||
| 	const goRoutineN = 10 | ||||
|  | ||||
| 	var wg sync.WaitGroup | ||||
| 	wg.Add(goRoutineN) | ||||
|  | ||||
| 	var r log.Record | ||||
| 	r.SetSeverityText("test") | ||||
| 	ctx := context.Background() | ||||
| 	s := log.NewSimpleProcessor(nil) | ||||
| 	for i := 0; i < goRoutineN; i++ { | ||||
| 		go func() { | ||||
| 			defer wg.Done() | ||||
|  | ||||
| 			_ = s.OnEmit(ctx, r) | ||||
| 			_ = s.Enabled(ctx, r) | ||||
| 			_ = s.Shutdown(ctx) | ||||
| 			_ = s.ForceFlush(ctx) | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| 	wg.Wait() | ||||
| } | ||||
|  | ||||
| func BenchmarkSimpleProcessorOnEmit(b *testing.B) { | ||||
| 	var r log.Record | ||||
| 	r.SetSeverityText("test") | ||||
| 	ctx := context.Background() | ||||
| 	s := log.NewSimpleProcessor(nil) | ||||
|  | ||||
| 	b.ReportAllocs() | ||||
| 	b.ResetTimer() | ||||
| 	b.RunParallel(func(pb *testing.PB) { | ||||
| 		var out error | ||||
|  | ||||
| 		for pb.Next() { | ||||
| 			out = s.OnEmit(ctx, r) | ||||
| 		} | ||||
|  | ||||
| 		_ = out | ||||
| 	}) | ||||
| } | ||||
		Reference in New Issue
	
	Block a user