1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-07-05 00:28:58 +02:00

Restore the experimental/streaming SDK implementation (#55)

* Fix streaming - part 1

* Eliminate span{} state

* Eliminate trace/ dir

* Avoid missing AddEvent helpers
This commit is contained in:
Joshua MacDonald
2019-07-17 13:59:53 -07:00
committed by rghetia
parent 2c77e484b4
commit b26d6675ed
9 changed files with 76 additions and 187 deletions

View File

@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/api/stats"
"go.opentelemetry.io/api/tag"
"go.opentelemetry.io/api/trace"
"go.opentelemetry.io/experimental/streaming/sdk/event"
)
var (
@ -64,7 +63,7 @@ func main() {
err := tracer.WithSpan(ctx, "operation", func(ctx context.Context) error {
trace.CurrentSpan(ctx).AddEvent(ctx, event.WithAttr("Nice operation!", key.New("bogons").Int(100)))
trace.CurrentSpan(ctx).Event(ctx, "Nice operation!", key.New("bogons").Int(100))
trace.CurrentSpan(ctx).SetAttributes(anotherKey.String("yes"))
@ -76,7 +75,7 @@ func main() {
func(ctx context.Context) error {
trace.CurrentSpan(ctx).SetAttribute(lemonsKey.String("five"))
trace.CurrentSpan(ctx).AddEvent(ctx, event.WithString("Format schmormat %d!", 100))
trace.CurrentSpan(ctx).Event(ctx, "Sub span event")
stats.Record(ctx, measureTwo.M(1.3))

View File

@ -150,7 +150,14 @@ func Foreach(f func(Observer)) {
}
}
func NewScope(parent ScopeID, kv ...core.KeyValue) ScopeID {
// TODO
return parent
func NewScope(parent ScopeID, attributes ...core.KeyValue) ScopeID {
eventID := Record(Event{
Type: NEW_SCOPE,
Scope: parent,
Attributes: attributes,
})
return ScopeID{
EventID: eventID,
SpanContext: parent.SpanContext,
}
}

View File

@ -21,7 +21,9 @@ import (
"go.opentelemetry.io/api/core"
"go.opentelemetry.io/api/key"
"go.opentelemetry.io/experimental/streaming/exporter/reader"
"go.opentelemetry.io/experimental/streaming/sdk/trace"
// TODO this should not be an SDK dependency; move conventional tags into the API.
"go.opentelemetry.io/experimental/streaming/sdk"
)
var (
@ -119,10 +121,10 @@ func AppendEvent(buf *strings.Builder, data reader.Event) {
data.Tags.Foreach(f(true))
}
if data.SpanContext.HasSpanID() {
f(false)(trace.SpanIDKey.String(data.SpanContext.SpanIDString()))
f(false)(sdk.SpanIDKey.String(data.SpanContext.SpanIDString()))
}
if data.SpanContext.HasTraceID() {
f(false)(trace.TraceIDKey.String(data.SpanContext.TraceIDString()))
f(false)(sdk.TraceIDKey.String(data.SpanContext.TraceIDString()))
}
buf.WriteString(" ]\n")

View File

@ -117,6 +117,11 @@ func NewReaderObserver(readers ...Reader) observer.Observer {
}
func (ro *readerObserver) Observe(event observer.Event) {
// TODO this should check for out-of-order events and buffer.
ro.orderedObserve(event)
}
func (ro *readerObserver) orderedObserve(event observer.Event) {
read := Event{
Time: event.Time,
Sequence: event.Sequence,
@ -169,7 +174,7 @@ func (ro *readerObserver) Observe(event observer.Event) {
case observer.FINISH_SPAN:
attrs, span := ro.readScope(event.Scope)
if span == nil {
panic("span not found")
panic(fmt.Sprint("span not found", event.Scope))
}
read.Name = span.name

View File

@ -18,7 +18,6 @@ import (
"os"
"strings"
"go.opentelemetry.io/experimental/streaming/exporter/buffer"
"go.opentelemetry.io/experimental/streaming/exporter/observer"
"go.opentelemetry.io/experimental/streaming/exporter/spandata"
"go.opentelemetry.io/experimental/streaming/exporter/spandata/format"
@ -27,7 +26,7 @@ import (
type spanLog struct{}
func New() observer.Observer {
return buffer.NewBuffer(1000, spandata.NewReaderObserver(&spanLog{}))
return spandata.NewReaderObserver(&spanLog{})
}
func (s *spanLog) Read(data *spandata.Span) {

View File

@ -1,48 +0,0 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"fmt"
"go.opentelemetry.io/api/core"
apievent "go.opentelemetry.io/api/event"
)
type event struct {
message string
attributes []core.KeyValue
}
var _ apievent.Event = (*event)(nil)
// WithAttr creates an Event with Attributes and a message.
// Attributes are immutable.
func WithAttr(msg string, attributes ...core.KeyValue) apievent.Event {
return event{message: msg, attributes: attributes}
}
// WithString creates an Event with formatted string.
func WithString(f string, args ...interface{}) apievent.Event {
return event{message: fmt.Sprint(f, args), attributes: nil}
}
func (e event) Message() string {
return e.message
}
func (e event) Attributes() []core.KeyValue {
return append(e.attributes[:0:0], e.attributes...)
}

View File

@ -0,0 +1,9 @@
package sdk
import (
"go.opentelemetry.io/api/trace"
)
func init() {
trace.SetGlobalTracer(New())
}

View File

@ -12,11 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
package sdk
import (
"context"
"sync"
"google.golang.org/grpc/codes"
@ -29,21 +28,13 @@ import (
type span struct {
tracer *tracer
spanContext core.SpanContext
lock sync.Mutex
eventID observer.EventID
finishOnce sync.Once
recordEvent bool
status codes.Code
initial observer.ScopeID
}
// SpancContext returns span context of the span. Return SpanContext is usable
// even after the span is finished.
func (sp *span) SpanContext() core.SpanContext {
if sp == nil {
return core.INVALID_SPAN_CONTEXT
}
return sp.spanContext
return sp.initial.SpanContext
}
// IsRecordingEvents returns true is the span is active and recording events is enabled.
@ -53,123 +44,56 @@ func (sp *span) IsRecordingEvents() bool {
// SetStatus sets the status of the span.
func (sp *span) SetStatus(status codes.Code) {
if sp == nil {
return
}
sid := sp.ScopeID()
observer.Record(observer.Event{
Type: observer.SET_STATUS,
Scope: sid,
Sequence: sid.EventID,
Scope: sp.ScopeID(),
Status: status,
})
sp.status = status
}
func (sp *span) ScopeID() observer.ScopeID {
if sp == nil {
return observer.ScopeID{}
}
sp.lock.Lock()
sid := observer.ScopeID{
EventID: sp.eventID,
SpanContext: sp.spanContext,
}
sp.lock.Unlock()
return sid
}
func (sp *span) updateScope() (observer.ScopeID, observer.EventID) {
next := observer.NextEventID()
sp.lock.Lock()
sid := observer.ScopeID{
EventID: sp.eventID,
SpanContext: sp.spanContext,
}
sp.eventID = next
sp.lock.Unlock()
return sid, next
}
func (sp *span) SetError(v bool) {
sp.SetAttribute(ErrorKey.Bool(v))
return sp.initial
}
func (sp *span) SetAttribute(attribute core.KeyValue) {
if sp == nil {
return
}
sid, next := sp.updateScope()
observer.Record(observer.Event{
Type: observer.MODIFY_ATTR,
Scope: sid,
Sequence: next,
Scope: sp.ScopeID(),
Attribute: attribute,
})
}
func (sp *span) SetAttributes(attributes ...core.KeyValue) {
if sp == nil {
return
}
sid, next := sp.updateScope()
observer.Record(observer.Event{
Type: observer.MODIFY_ATTR,
Scope: sid,
Sequence: next,
Scope: sp.ScopeID(),
Attributes: attributes,
})
}
func (sp *span) ModifyAttribute(mutator tag.Mutator) {
if sp == nil {
return
}
sid, next := sp.updateScope()
observer.Record(observer.Event{
Type: observer.MODIFY_ATTR,
Scope: sid,
Sequence: next,
Scope: sp.ScopeID(),
Mutator: mutator,
})
}
func (sp *span) ModifyAttributes(mutators ...tag.Mutator) {
if sp == nil {
return
}
sid, next := sp.updateScope()
observer.Record(observer.Event{
Type: observer.MODIFY_ATTR,
Scope: sid,
Sequence: next,
Scope: sp.ScopeID(),
Mutators: mutators,
})
}
func (sp *span) Finish() {
if sp == nil {
return
}
recovered := recover()
sp.finishOnce.Do(func() {
observer.Record(observer.Event{
Type: observer.FINISH_SPAN,
Scope: sp.ScopeID(),
Recovered: recovered,
})
})
if recovered != nil {
panic(recovered)
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
package sdk
import (
"context"
@ -24,7 +24,6 @@ import (
"go.opentelemetry.io/api/trace"
apitrace "go.opentelemetry.io/api/trace"
"go.opentelemetry.io/experimental/streaming/exporter/observer"
"go.opentelemetry.io/experimental/streaming/sdk/event"
)
type tracer struct {
@ -32,6 +31,7 @@ type tracer struct {
}
var (
// TODO These should move somewhere in the api, right?
ServiceKey = key.New("service")
ComponentKey = key.New("component")
ErrorKey = key.New("error")
@ -42,20 +42,17 @@ var (
)
)
var t = &tracer{}
// Register registers tracer to global registry and returns the registered tracer.
func Register() apitrace.Tracer {
apitrace.SetGlobalTracer(t)
return t
func New() trace.Tracer {
return &tracer{}
}
func (t *tracer) WithResources(attributes ...core.KeyValue) apitrace.Tracer {
return t
// s := scope.New(t.resources.Scope(), attributes...)
// return &tracer{
// resources: s.ScopeID().EventID,
// }
s := observer.NewScope(observer.ScopeID{
EventID: t.resources,
}, attributes...)
return &tracer{
resources: s.EventID,
}
}
func (t *tracer) WithComponent(name string) apitrace.Tracer {
@ -74,18 +71,12 @@ func (t *tracer) WithSpan(ctx context.Context, name string, body func(context.Co
if err := body(ctx); err != nil {
span.SetAttribute(ErrorKey.Bool(true))
span.AddEvent(ctx, event.WithAttr("span error", MessageKey.String(err.Error())))
span.Event(ctx, "span error", MessageKey.String(err.Error()))
return err
}
return nil
}
// Start starts a new span with provided name and span options.
// If parent span reference is provided in the span option then it is used as as parent.
// Otherwise, parent span reference is retrieved from current context.
// The new span uses the same TraceID as parent.
// If no parent is found then a root span is created and started with random TraceID.
// TODO: Add sampling logic.
func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOption) (context.Context, apitrace.Span) {
var child core.SpanContext
@ -102,8 +93,7 @@ func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOp
if o.Reference.HasTraceID() {
parentScope.SpanContext = o.Reference.SpanContext
} else {
parentSpan, _ := apitrace.CurrentSpan(ctx).(*span)
parentScope = parentSpan.ScopeID()
parentScope.SpanContext = apitrace.CurrentSpan(ctx).SpanContext()
}
if parentScope.HasTraceID() {
@ -121,17 +111,19 @@ func (t *tracer) Start(ctx context.Context, name string, opts ...apitrace.SpanOp
}
span := &span{
spanContext: child,
tracer: t,
recordEvent: o.RecordEvent,
eventID: observer.Record(observer.Event{
initial: observer.ScopeID{
SpanContext: child,
EventID: observer.Record(observer.Event{
Time: o.StartTime,
Type: observer.START_SPAN,
Scope: observer.NewScope(childScope, o.Attributes...),
Context: ctx,
Parent: parentScope,
String: name,
}),
},
),
},
}
return trace.SetCurrentSpan(ctx, span), span
}