diff --git a/example/basic/main.go b/example/basic/main.go index 18973e223..9a3421b40 100644 --- a/example/basic/main.go +++ b/example/basic/main.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/api/stats" "go.opentelemetry.io/api/tag" "go.opentelemetry.io/api/trace" - "go.opentelemetry.io/experimental/streaming/sdk/event" ) var ( @@ -64,7 +63,7 @@ func main() { err := tracer.WithSpan(ctx, "operation", func(ctx context.Context) error { - trace.CurrentSpan(ctx).AddEvent(ctx, event.WithAttr("Nice operation!", key.New("bogons").Int(100))) + trace.CurrentSpan(ctx).Event(ctx, "Nice operation!", key.New("bogons").Int(100)) trace.CurrentSpan(ctx).SetAttributes(anotherKey.String("yes")) @@ -76,7 +75,7 @@ func main() { func(ctx context.Context) error { trace.CurrentSpan(ctx).SetAttribute(lemonsKey.String("five")) - trace.CurrentSpan(ctx).AddEvent(ctx, event.WithString("Format schmormat %d!", 100)) + trace.CurrentSpan(ctx).Event(ctx, "Sub span event") stats.Record(ctx, measureTwo.M(1.3)) diff --git a/experimental/streaming/exporter/observer/observer.go b/experimental/streaming/exporter/observer/observer.go index 96d899733..26248656e 100644 --- a/experimental/streaming/exporter/observer/observer.go +++ b/experimental/streaming/exporter/observer/observer.go @@ -150,7 +150,14 @@ func Foreach(f func(Observer)) { } } -func NewScope(parent ScopeID, kv ...core.KeyValue) ScopeID { - // TODO - return parent +func NewScope(parent ScopeID, attributes ...core.KeyValue) ScopeID { + eventID := Record(Event{ + Type: NEW_SCOPE, + Scope: parent, + Attributes: attributes, + }) + return ScopeID{ + EventID: eventID, + SpanContext: parent.SpanContext, + } } diff --git a/experimental/streaming/exporter/reader/format/format.go b/experimental/streaming/exporter/reader/format/format.go index ab8cf3fb2..a8534d84c 100644 --- a/experimental/streaming/exporter/reader/format/format.go +++ b/experimental/streaming/exporter/reader/format/format.go @@ -21,7 +21,9 @@ import ( "go.opentelemetry.io/api/core" "go.opentelemetry.io/api/key" "go.opentelemetry.io/experimental/streaming/exporter/reader" - "go.opentelemetry.io/experimental/streaming/sdk/trace" + + // TODO this should not be an SDK dependency; move conventional tags into the API. + "go.opentelemetry.io/experimental/streaming/sdk" ) var ( @@ -119,10 +121,10 @@ func AppendEvent(buf *strings.Builder, data reader.Event) { data.Tags.Foreach(f(true)) } if data.SpanContext.HasSpanID() { - f(false)(trace.SpanIDKey.String(data.SpanContext.SpanIDString())) + f(false)(sdk.SpanIDKey.String(data.SpanContext.SpanIDString())) } if data.SpanContext.HasTraceID() { - f(false)(trace.TraceIDKey.String(data.SpanContext.TraceIDString())) + f(false)(sdk.TraceIDKey.String(data.SpanContext.TraceIDString())) } buf.WriteString(" ]\n") diff --git a/experimental/streaming/exporter/reader/reader.go b/experimental/streaming/exporter/reader/reader.go index 7ddd01692..1f05d0a36 100644 --- a/experimental/streaming/exporter/reader/reader.go +++ b/experimental/streaming/exporter/reader/reader.go @@ -117,6 +117,11 @@ func NewReaderObserver(readers ...Reader) observer.Observer { } func (ro *readerObserver) Observe(event observer.Event) { + // TODO this should check for out-of-order events and buffer. + ro.orderedObserve(event) +} + +func (ro *readerObserver) orderedObserve(event observer.Event) { read := Event{ Time: event.Time, Sequence: event.Sequence, @@ -169,7 +174,7 @@ func (ro *readerObserver) Observe(event observer.Event) { case observer.FINISH_SPAN: attrs, span := ro.readScope(event.Scope) if span == nil { - panic("span not found") + panic(fmt.Sprint("span not found", event.Scope)) } read.Name = span.name diff --git a/experimental/streaming/exporter/spanlog/spanlog.go b/experimental/streaming/exporter/spanlog/spanlog.go index a7dbefdec..14e7604e3 100644 --- a/experimental/streaming/exporter/spanlog/spanlog.go +++ b/experimental/streaming/exporter/spanlog/spanlog.go @@ -18,7 +18,6 @@ import ( "os" "strings" - "go.opentelemetry.io/experimental/streaming/exporter/buffer" "go.opentelemetry.io/experimental/streaming/exporter/observer" "go.opentelemetry.io/experimental/streaming/exporter/spandata" "go.opentelemetry.io/experimental/streaming/exporter/spandata/format" @@ -27,7 +26,7 @@ import ( type spanLog struct{} func New() observer.Observer { - return buffer.NewBuffer(1000, spandata.NewReaderObserver(&spanLog{})) + return spandata.NewReaderObserver(&spanLog{}) } func (s *spanLog) Read(data *spandata.Span) { diff --git a/experimental/streaming/sdk/event/event.go b/experimental/streaming/sdk/event/event.go deleted file mode 100644 index 664e88fe1..000000000 --- a/experimental/streaming/sdk/event/event.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package event - -import ( - "fmt" - - "go.opentelemetry.io/api/core" - apievent "go.opentelemetry.io/api/event" -) - -type event struct { - message string - attributes []core.KeyValue -} - -var _ apievent.Event = (*event)(nil) - -// WithAttr creates an Event with Attributes and a message. -// Attributes are immutable. -func WithAttr(msg string, attributes ...core.KeyValue) apievent.Event { - return event{message: msg, attributes: attributes} -} - -// WithString creates an Event with formatted string. -func WithString(f string, args ...interface{}) apievent.Event { - return event{message: fmt.Sprint(f, args), attributes: nil} -} - -func (e event) Message() string { - return e.message -} - -func (e event) Attributes() []core.KeyValue { - return append(e.attributes[:0:0], e.attributes...) -} diff --git a/experimental/streaming/sdk/package.go b/experimental/streaming/sdk/package.go new file mode 100644 index 000000000..b21c00b4d --- /dev/null +++ b/experimental/streaming/sdk/package.go @@ -0,0 +1,9 @@ +package sdk + +import ( + "go.opentelemetry.io/api/trace" +) + +func init() { + trace.SetGlobalTracer(New()) +} diff --git a/experimental/streaming/sdk/trace/span.go b/experimental/streaming/sdk/span.go similarity index 61% rename from experimental/streaming/sdk/trace/span.go rename to experimental/streaming/sdk/span.go index 3d5be4feb..9dc155115 100644 --- a/experimental/streaming/sdk/trace/span.go +++ b/experimental/streaming/sdk/span.go @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package trace +package sdk import ( "context" - "sync" "google.golang.org/grpc/codes" @@ -28,22 +27,14 @@ import ( ) type span struct { - tracer *tracer - spanContext core.SpanContext - lock sync.Mutex - eventID observer.EventID - finishOnce sync.Once - recordEvent bool - status codes.Code + tracer *tracer + initial observer.ScopeID } // SpancContext returns span context of the span. Return SpanContext is usable // even after the span is finished. func (sp *span) SpanContext() core.SpanContext { - if sp == nil { - return core.INVALID_SPAN_CONTEXT - } - return sp.spanContext + return sp.initial.SpanContext } // IsRecordingEvents returns true is the span is active and recording events is enabled. @@ -53,122 +44,55 @@ func (sp *span) IsRecordingEvents() bool { // SetStatus sets the status of the span. func (sp *span) SetStatus(status codes.Code) { - if sp == nil { - return - } - sid := sp.ScopeID() - observer.Record(observer.Event{ - Type: observer.SET_STATUS, - Scope: sid, - Sequence: sid.EventID, - Status: status, + Type: observer.SET_STATUS, + Scope: sp.ScopeID(), + Status: status, }) - sp.status = status } func (sp *span) ScopeID() observer.ScopeID { - if sp == nil { - return observer.ScopeID{} - } - sp.lock.Lock() - sid := observer.ScopeID{ - EventID: sp.eventID, - SpanContext: sp.spanContext, - } - sp.lock.Unlock() - return sid -} - -func (sp *span) updateScope() (observer.ScopeID, observer.EventID) { - next := observer.NextEventID() - - sp.lock.Lock() - sid := observer.ScopeID{ - EventID: sp.eventID, - SpanContext: sp.spanContext, - } - sp.eventID = next - sp.lock.Unlock() - - return sid, next -} - -func (sp *span) SetError(v bool) { - sp.SetAttribute(ErrorKey.Bool(v)) + return sp.initial } func (sp *span) SetAttribute(attribute core.KeyValue) { - if sp == nil { - return - } - - sid, next := sp.updateScope() - observer.Record(observer.Event{ Type: observer.MODIFY_ATTR, - Scope: sid, - Sequence: next, + Scope: sp.ScopeID(), Attribute: attribute, }) } func (sp *span) SetAttributes(attributes ...core.KeyValue) { - if sp == nil { - return - } - - sid, next := sp.updateScope() - observer.Record(observer.Event{ Type: observer.MODIFY_ATTR, - Scope: sid, - Sequence: next, + Scope: sp.ScopeID(), Attributes: attributes, }) } func (sp *span) ModifyAttribute(mutator tag.Mutator) { - if sp == nil { - return - } - - sid, next := sp.updateScope() - observer.Record(observer.Event{ - Type: observer.MODIFY_ATTR, - Scope: sid, - Sequence: next, - Mutator: mutator, + Type: observer.MODIFY_ATTR, + Scope: sp.ScopeID(), + Mutator: mutator, }) } func (sp *span) ModifyAttributes(mutators ...tag.Mutator) { - if sp == nil { - return - } - - sid, next := sp.updateScope() - observer.Record(observer.Event{ Type: observer.MODIFY_ATTR, - Scope: sid, - Sequence: next, + Scope: sp.ScopeID(), Mutators: mutators, }) } func (sp *span) Finish() { - if sp == nil { - return - } recovered := recover() - sp.finishOnce.Do(func() { - observer.Record(observer.Event{ - Type: observer.FINISH_SPAN, - Scope: sp.ScopeID(), - Recovered: recovered, - }) + observer.Record(observer.Event{ + Type: observer.FINISH_SPAN, + Scope: sp.ScopeID(), + Recovered: recovered, }) if recovered != nil { panic(recovered) diff --git a/experimental/streaming/sdk/trace/trace.go b/experimental/streaming/sdk/trace.go similarity index 69% rename from experimental/streaming/sdk/trace/trace.go rename to experimental/streaming/sdk/trace.go index 3e5f6edf7..c765e7c2f 100644 --- a/experimental/streaming/sdk/trace/trace.go +++ b/experimental/streaming/sdk/trace.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package trace +package sdk import ( "context" @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/api/trace" apitrace "go.opentelemetry.io/api/trace" "go.opentelemetry.io/experimental/streaming/exporter/observer" - "go.opentelemetry.io/experimental/streaming/sdk/event" ) type tracer struct { @@ -32,6 +31,7 @@ type tracer struct { } var ( + // TODO These should move somewhere in the api, right? ServiceKey = key.New("service") ComponentKey = key.New("component") ErrorKey = key.New("error") @@ -42,20 +42,17 @@ var ( ) ) -var t = &tracer{} - -// Register registers tracer to global registry and returns the registered tracer. -func Register() apitrace.Tracer { - apitrace.SetGlobalTracer(t) - return t +func New() trace.Tracer { + return &tracer{} } func (t *tracer) WithResources(attributes ...core.KeyValue) apitrace.Tracer { - return t - // s := scope.New(t.resources.Scope(), attributes...) - // return &tracer{ - // resources: s.ScopeID().EventID, - // } + s := observer.NewScope(observer.ScopeID{ + EventID: t.resources, + }, attributes...) + return &tracer{ + resources: s.EventID, + } } func (t *tracer) WithComponent(name string) apitrace.Tracer { @@ -74,18 +71,12 @@ func (t *tracer) WithSpan(ctx context.Context, name string, body func(context.Co if err := body(ctx); err != nil { span.SetAttribute(ErrorKey.Bool(true)) - span.AddEvent(ctx, event.WithAttr("span error", MessageKey.String(err.Error()))) + span.Event(ctx, "span error", MessageKey.String(err.Error())) return err } return nil } -// Start starts a new span with provided name and span options. -// If parent span reference is provided in the span option then it is used as as parent. -// Otherwise, parent span reference is retrieved from current context. -// The new span uses the same TraceID as parent. -// If no parent is found then a root span is created and started with random TraceID. -// TODO: Add sampling logic. func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOption) (context.Context, apitrace.Span) { var child core.SpanContext @@ -102,8 +93,7 @@ func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOp if o.Reference.HasTraceID() { parentScope.SpanContext = o.Reference.SpanContext } else { - parentSpan, _ := apitrace.CurrentSpan(ctx).(*span) - parentScope = parentSpan.ScopeID() + parentScope.SpanContext = apitrace.CurrentSpan(ctx).SpanContext() } if parentScope.HasTraceID() { @@ -121,17 +111,19 @@ func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOp } span := &span{ - spanContext: child, - tracer: t, - recordEvent: o.RecordEvent, - eventID: observer.Record(observer.Event{ - Time: o.StartTime, - Type: observer.START_SPAN, - Scope: observer.NewScope(childScope, o.Attributes...), - Context: ctx, - Parent: parentScope, - String: name, - }), + tracer: t, + initial: observer.ScopeID{ + SpanContext: child, + EventID: observer.Record(observer.Event{ + Time: o.StartTime, + Type: observer.START_SPAN, + Scope: observer.NewScope(childScope, o.Attributes...), + Context: ctx, + Parent: parentScope, + String: name, + }, + ), + }, } return trace.SetCurrentSpan(ctx, span), span }