1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-01 13:07:51 +02:00

SDK: span processor interface and simple span processor. (#117)

* SDK: SpanProcessor Interface.

* add simple span processor.

* rename span processor.

* fix logic to export or process span data.
This commit is contained in:
rghetia 2019-09-16 13:58:15 -07:00 committed by GitHub
parent 5df3c071e8
commit cb0d352ec6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 333 additions and 7 deletions

View File

@ -0,0 +1,47 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
// SimpleSpanProcessor implements SpanProcessor interfaces. It is used by
// exporters to receive SpanData synchronously when span is finished.
type SimpleSpanProcessor struct {
exporter Exporter
}
var _ SpanProcessor = (*SimpleSpanProcessor)(nil)
// NewSimpleSpanProcessor creates a new instance of SimpleSpanProcessor
// for a given exporter.
func NewSimpleSpanProcessor(exporter Exporter) *SimpleSpanProcessor {
ssp := &SimpleSpanProcessor{
exporter: exporter,
}
return ssp
}
// OnStart method does nothing.
func (ssp *SimpleSpanProcessor) OnStart(sd *SpanData) {
}
// OnEnd method exports SpanData using associated exporter.
func (ssp *SimpleSpanProcessor) OnEnd(sd *SpanData) {
if ssp.exporter != nil {
ssp.exporter.ExportSpan(sd)
}
}
// Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown() {
}

View File

@ -0,0 +1,77 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace_test
import (
"context"
"testing"
"go.opentelemetry.io/api/core"
apitrace "go.opentelemetry.io/api/trace"
sdktrace "go.opentelemetry.io/sdk/trace"
)
type testExporter struct {
spans []*sdktrace.SpanData
}
func (t *testExporter) ExportSpan(s *sdktrace.SpanData) {
t.spans = append(t.spans, s)
}
var _ sdktrace.Exporter = (*testExporter)(nil)
func init() {
sdktrace.Register()
sdktrace.ApplyConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()})
}
func TestNewSimpleSpanProcessor(t *testing.T) {
ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{})
if ssp == nil {
t.Errorf("Error creating new instance of SimpleSpanProcessor\n")
}
}
func TestNewSimpleSpanProcessorWithNilExporter(t *testing.T) {
ssp := sdktrace.NewSimpleSpanProcessor(nil)
if ssp == nil {
t.Errorf("Error creating new instance of SimpleSpanProcessor with nil Exporter\n")
}
}
func TestSimpleSpanProcessorOnEnd(t *testing.T) {
te := testExporter{}
ssp := sdktrace.NewSimpleSpanProcessor(&te)
if ssp == nil {
t.Errorf("Error creating new instance of SimpleSpanProcessor with nil Exporter\n")
}
sdktrace.RegisterSpanProcessor(ssp)
tid := core.TraceID{High: 0x0102030405060708, Low: 0x0102040810203040}
sid := uint64(0x0102040810203040)
sc := core.SpanContext{
TraceID: tid,
SpanID: sid,
TraceOptions: 0x1,
}
_, span := apitrace.GlobalTracer().Start(context.Background(), "OnEnd", apitrace.ChildOf(sc))
span.Finish()
wantTraceID := tid
gotTraceID := te.spans[0].SpanContext.TraceID
if wantTraceID != gotTraceID {
t.Errorf("SimplerSpanProcessor OnEnd() check: got %+v, want %+v\n", gotTraceID, wantTraceID)
}
}

View File

@ -123,23 +123,26 @@ func (s *span) Finish(options ...apitrace.FinishOption) {
}
s.endOnce.Do(func() {
exp, _ := exporters.Load().(exportersMap)
mustExport := s.spanContext.IsSampled() && len(exp) > 0
//if s.spanStore != nil || mustExport {
if mustExport {
sps, _ := spanProcessors.Load().(spanProcessorMap)
mustExportOrProcess := len(sps) > 0 || (s.spanContext.IsSampled() && len(exp) > 0)
// TODO(rghetia): when exporter is migrated to use processors simply check for the number
// of processors. Exporter will export based on sampling.
if mustExportOrProcess {
sd := s.makeSpanData()
if opts.FinishTime.IsZero() {
sd.EndTime = internal.MonotonicEndTime(sd.StartTime)
} else {
sd.EndTime = opts.FinishTime
}
//if s.spanStore != nil {
// s.spanStore.finished(s, sd)
//}
if mustExport {
// Sampling check would be in the processor if the processor is used for exporting.
if s.spanContext.IsSampled() {
for e := range exp {
e.ExportSpan(sd)
}
}
for sp := range sps {
sp.OnEnd(sd)
}
}
})
}

View File

@ -0,0 +1,74 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
import (
"sync"
"sync/atomic"
)
// SpanProcessor is interface to add hooks to start and end method invocations.
type SpanProcessor interface {
// OnStart method is invoked when span is started. It is a synchronous call
// and hence should not block.
OnStart(sd *SpanData)
// OnEnd method is invoked when span is finished. It is a synchronous call
// and hence should not block.
OnEnd(sd *SpanData)
// Shutdown is invoked when SDK shutsdown. Use this call to cleanup any processor
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
// made. It should not be blocked indefinitely.
Shutdown()
}
type spanProcessorMap map[SpanProcessor]struct{}
var (
mu sync.Mutex
spanProcessors atomic.Value
)
// RegisterSpanProcessor adds to the list of SpanProcessors that will receive sampled
// trace spans.
func RegisterSpanProcessor(e SpanProcessor) {
mu.Lock()
defer mu.Unlock()
new := make(spanProcessorMap)
if old, ok := spanProcessors.Load().(spanProcessorMap); ok {
for k, v := range old {
new[k] = v
}
}
new[e] = struct{}{}
spanProcessors.Store(new)
}
// UnregisterSpanProcessor removes from the list of SpanProcessors the SpanProcessor that was
// registered with the given name.
func UnregisterSpanProcessor(e SpanProcessor) {
mu.Lock()
defer mu.Unlock()
new := make(spanProcessorMap)
if old, ok := spanProcessors.Load().(spanProcessorMap); ok {
for k, v := range old {
new[k] = v
}
}
delete(new, e)
spanProcessors.Store(new)
}

View File

@ -0,0 +1,118 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace_test
import (
"context"
"testing"
apitrace "go.opentelemetry.io/api/trace"
sdktrace "go.opentelemetry.io/sdk/trace"
)
type testSpanProcesor struct {
spansStarted []*sdktrace.SpanData
spansEnded []*sdktrace.SpanData
shutdownCount int
}
func init() {
sdktrace.Register()
sdktrace.ApplyConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()})
}
func (t *testSpanProcesor) OnStart(s *sdktrace.SpanData) {
t.spansStarted = append(t.spansStarted, s)
}
func (t *testSpanProcesor) OnEnd(s *sdktrace.SpanData) {
t.spansEnded = append(t.spansEnded, s)
}
func (t *testSpanProcesor) Shutdown() {
t.shutdownCount++
}
func TestRegisterSpanProcessort(t *testing.T) {
name := "Register span processor before span starts"
sp := NewTestSpanProcessor()
sdktrace.RegisterSpanProcessor(sp)
defer sdktrace.UnregisterSpanProcessor(sp)
_, span := apitrace.GlobalTracer().Start(context.Background(), "OnStart")
span.Finish()
wantCount := 1
gotCount := len(sp.spansStarted)
if gotCount != wantCount {
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
}
gotCount = len(sp.spansEnded)
if gotCount != wantCount {
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
}
}
func TestUnregisterSpanProcessor(t *testing.T) {
name := "Start span after unregistering span processor"
sp := NewTestSpanProcessor()
sdktrace.RegisterSpanProcessor(sp)
_, span := apitrace.GlobalTracer().Start(context.Background(), "OnStart")
span.Finish()
sdktrace.UnregisterSpanProcessor(sp)
// start another span after unregistering span processor.
_, span = apitrace.GlobalTracer().Start(context.Background(), "Start span after unregister")
span.Finish()
wantCount := 1
gotCount := len(sp.spansStarted)
if gotCount != wantCount {
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
}
gotCount = len(sp.spansEnded)
if gotCount != wantCount {
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
}
}
func TestUnregisterSpanProcessorWhileSpanIsActive(t *testing.T) {
name := "Unregister span processor while span is active"
sp := NewTestSpanProcessor()
sdktrace.RegisterSpanProcessor(sp)
_, span := apitrace.GlobalTracer().Start(context.Background(), "OnStart")
sdktrace.UnregisterSpanProcessor(sp)
span.Finish()
wantCount := 1
gotCount := len(sp.spansStarted)
if gotCount != wantCount {
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
}
wantCount = 0
gotCount = len(sp.spansEnded)
if gotCount != wantCount {
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
}
}
// TODO(rghetia): Add Shutdown test when it is implemented.
func TestShutdown(t *testing.T) {
}
func NewTestSpanProcessor() *testSpanProcesor {
return &testSpanProcesor{}
}

View File

@ -61,6 +61,13 @@ func (tr *tracer) Start(ctx context.Context, name string, o ...apitrace.SpanOpti
span := startSpanInternal(name, parent, remoteParent, opts)
span.tracer = tr
if span.IsRecordingEvents() {
sps, _ := spanProcessors.Load().(spanProcessorMap)
for sp := range sps {
sp.OnStart(span.data)
}
}
ctx, end := startExecutionTracerTask(ctx, name)
span.executionTracerTaskEnd = end
return newContext(ctx, span), span