From e17f4468a6741177690e24733c9dcad8e14dd920 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 14 Jun 2019 11:37:05 -0700 Subject: [PATCH] Golang opentelemetry-go draft API (incomplete) (#9) * Work in progress from https://github.com/lightstep/opentelemetry-golang-prototype * Renames * Rename * Finish rename * Rename packages * README --- README.md | 20 ++ api/core/core.go | 179 ++++++++++++++ api/log/log.go | 52 ++++ api/metric/api.go | 60 +++++ api/metric/common.go | 74 ++++++ api/metric/gauge.go | 39 +++ api/metric/registry.go | 78 ++++++ api/scope/scope.go | 67 ++++++ api/stats/stats.go | 50 ++++ api/tag/api.go | 99 ++++++++ api/tag/map.go | 111 +++++++++ api/tag/tag.go | 200 ++++++++++++++++ api/trace/api.go | 144 +++++++++++ api/trace/span.go | 138 +++++++++++ api/trace/trace.go | 145 +++++++++++ api/unit/unit.go | 11 + example/example.go | 78 ++++++ example/http/client/client.go | 57 +++++ example/http/server/modd.conf | 7 + example/http/server/server.go | 49 ++++ exporter/buffer/buffer.go | 62 +++++ exporter/loader/loader.go | 42 ++++ exporter/observer/eventtype_string.go | 32 +++ exporter/observer/observer.go | 126 ++++++++++ exporter/reader/format/format.go | 107 +++++++++ exporter/reader/reader.go | 332 ++++++++++++++++++++++++++ exporter/spandata/format/format.go | 20 ++ exporter/spandata/spandata.go | 55 +++++ exporter/spanlog/install/package.go | 16 ++ exporter/spanlog/plugin/Makefile | 4 + exporter/spanlog/plugin/package.go | 10 + exporter/spanlog/spanlog.go | 26 ++ exporter/stderr/install/package.go | 16 ++ exporter/stderr/plugin/Makefile | 4 + exporter/stderr/plugin/package.go | 10 + exporter/stderr/stderr.go | 21 ++ exporter/stdout/install/package.go | 16 ++ exporter/stdout/plugin/Makefile | 4 + exporter/stdout/plugin/package.go | 10 + exporter/stdout/stdout.go | 21 ++ plugin/httptrace/api.go | 35 +++ plugin/httptrace/clienttrace.go | 174 ++++++++++++++ plugin/httptrace/httptrace.go | 87 +++++++ 43 files changed, 2888 insertions(+) create mode 100644 README.md create mode 100644 api/core/core.go create mode 100644 api/log/log.go create mode 100644 api/metric/api.go create mode 100644 api/metric/common.go create mode 100644 api/metric/gauge.go create mode 100644 api/metric/registry.go create mode 100644 api/scope/scope.go create mode 100644 api/stats/stats.go create mode 100644 api/tag/api.go create mode 100644 api/tag/map.go create mode 100644 api/tag/tag.go create mode 100644 api/trace/api.go create mode 100644 api/trace/span.go create mode 100644 api/trace/trace.go create mode 100644 api/unit/unit.go create mode 100644 example/example.go create mode 100644 example/http/client/client.go create mode 100644 example/http/server/modd.conf create mode 100644 example/http/server/server.go create mode 100644 exporter/buffer/buffer.go create mode 100644 exporter/loader/loader.go create mode 100644 exporter/observer/eventtype_string.go create mode 100644 exporter/observer/observer.go create mode 100644 exporter/reader/format/format.go create mode 100644 exporter/reader/reader.go create mode 100644 exporter/spandata/format/format.go create mode 100644 exporter/spandata/spandata.go create mode 100644 exporter/spanlog/install/package.go create mode 100644 exporter/spanlog/plugin/Makefile create mode 100644 exporter/spanlog/plugin/package.go create mode 100644 exporter/spanlog/spanlog.go create mode 100644 exporter/stderr/install/package.go create mode 100644 exporter/stderr/plugin/Makefile create mode 100644 exporter/stderr/plugin/package.go create mode 100644 exporter/stderr/stderr.go create mode 100644 exporter/stdout/install/package.go create mode 100644 exporter/stdout/plugin/Makefile create mode 100644 exporter/stdout/plugin/package.go create mode 100644 exporter/stdout/stdout.go create mode 100644 plugin/httptrace/api.go create mode 100644 plugin/httptrace/clienttrace.go create mode 100644 plugin/httptrace/httptrace.go diff --git a/README.md b/README.md new file mode 100644 index 000000000..6b7671153 --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ +This is a prototype *intended to be modified* into the opentelemetry-go implementation. The `api` directory here should be used as a starting point to introduce a new OpenTelemetry exporter, wherease the existing `exporter/observer` streaming model should be help verify the api + +To run the examples, first build the stderr tracer plugin (requires Linux or OS X): + +``` +(cd ./exporter/stdout/plugin && make) +(cd ./exporter/spanlog/plugin && make) +``` + +Then set the `OPENTELEMETRY_LIB` environment variable to the .so file in that directory, e.g., + +``` +OPENTELEMETRY_LIB=./exporter/stderr/plugin/stderr.so go run ./example/server/server.go +``` + +and + +``` +OPENTELEMETRY_LIB=./exporter/spanlog/plugin/spanlog.so go run ./example/client/client.go +``` diff --git a/api/core/core.go b/api/core/core.go new file mode 100644 index 000000000..8dc005a31 --- /dev/null +++ b/api/core/core.go @@ -0,0 +1,179 @@ +package core + +import ( + "context" + "fmt" + + "github.com/open-telemetry/opentelemetry-go/api/unit" +) + +type ( + ScopeID struct { + EventID + SpanContext + } + + SpanContext struct { + TraceIDHigh uint64 + TraceIDLow uint64 + SpanID uint64 + } + + EventID uint64 + + BaseMeasure interface { + Name() string + Description() string + Unit() unit.Unit + + DefinitionID() EventID + } + + Measure interface { + BaseMeasure + + M(float64) Measurement + V(float64) KeyValue + } + + Measurement struct { + // NOTE: If we add a ScopeID field this can carry + // pre-aggregated measures via the stats.Record API. + Measure Measure + Value float64 + ScopeID ScopeID + } + + Key interface { + BaseMeasure + + Value(ctx context.Context) KeyValue + + Bool(v bool) KeyValue + + Int(v int) KeyValue + Int32(v int32) KeyValue + Int64(v int64) KeyValue + + Uint(v uint) KeyValue + Uint32(v uint32) KeyValue + Uint64(v uint64) KeyValue + + Float32(v float32) KeyValue + Float64(v float64) KeyValue + + String(v string) KeyValue + Bytes(v []byte) KeyValue + } + + KeyValue struct { + Key Key + Value Value + } + + ValueType int + + Value struct { + Type ValueType + Bool bool + Int64 int64 + Uint64 uint64 + Float64 float64 + String string + Bytes []byte + + // TODO Lazy value type? + } + + MutatorOp int + + Mutator struct { + MutatorOp + KeyValue + MeasureMetadata + } + + MeasureMetadata struct { + MaxHops int // -1 == infinite, 0 == do not propagate + + // TODO time to live? + } +) + +const ( + INVALID ValueType = iota + BOOL + INT32 + INT64 + UINT32 + UINT64 + FLOAT32 + FLOAT64 + STRING + BYTES + + INSERT MutatorOp = iota + UPDATE + UPSERT + DELETE +) + +func (sc SpanContext) HasTraceID() bool { + return sc.TraceIDHigh != 0 || sc.TraceIDLow != 0 +} + +func (sc SpanContext) HasSpanID() bool { + return sc.SpanID != 0 +} + +func (sc SpanContext) SpanIDString() string { + p := fmt.Sprintf("%.16x", sc.SpanID) + return p[0:3] + ".." + p[13:16] +} + +func (sc SpanContext) TraceIDString() string { + p1 := fmt.Sprintf("%.16x", sc.TraceIDHigh) + p2 := fmt.Sprintf("%.16x", sc.TraceIDLow) + return p1[0:3] + ".." + p2[13:16] +} + +// TODO make this a lazy one-time conversion. +func (v Value) Emit() string { + switch v.Type { + case BOOL: + return fmt.Sprint(v.Bool) + case INT32, INT64: + return fmt.Sprint(v.Int64) + case UINT32, UINT64: + return fmt.Sprint(v.Uint64) + case FLOAT32, FLOAT64: + return fmt.Sprint(v.Float64) + case STRING: + return v.String + case BYTES: + return string(v.Bytes) + } + return "unknown" +} + +func (m Mutator) WithMaxHops(hops int) Mutator { + m.MaxHops = hops + return m +} + +func (e EventID) Scope() ScopeID { + return ScopeID{ + EventID: e, + } +} + +func (s SpanContext) Scope() ScopeID { + return ScopeID{ + SpanContext: s, + } +} + +func (m Measurement) With(id ScopeID) Measurement { + m.ScopeID = id + return m +} diff --git a/api/log/log.go b/api/log/log.go new file mode 100644 index 000000000..62677e296 --- /dev/null +++ b/api/log/log.go @@ -0,0 +1,52 @@ +package log + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/scope" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +type ( + Interface interface { + Log(ctx context.Context, msg string, fields ...core.KeyValue) + Logf(ctx context.Context, fmt string, args ...interface{}) + } + + Logger struct { + scope.Scope + } +) + +func With(scope scope.Scope) Logger { + return Logger{scope} +} + +func Log(ctx context.Context, msg string, fields ...core.KeyValue) { + With(scope.Active(ctx)).Log(ctx, msg, fields...) +} + +func Logf(ctx context.Context, fmt string, args ...interface{}) { + With(scope.Active(ctx)).Logf(ctx, fmt, args...) +} + +func (l Logger) Log(ctx context.Context, msg string, fields ...core.KeyValue) { + observer.Record(observer.Event{ + Type: observer.LOG_EVENT, + Scope: l.ScopeID(), + String: msg, + Attributes: fields, + Context: ctx, + }) +} + +func (l Logger) Logf(ctx context.Context, fmt string, args ...interface{}) { + observer.Record(observer.Event{ + Type: observer.LOGF_EVENT, + Scope: l.ScopeID(), + String: fmt, + Arguments: args, + Context: ctx, + }) +} diff --git a/api/metric/api.go b/api/metric/api.go new file mode 100644 index 000000000..861e27d0e --- /dev/null +++ b/api/metric/api.go @@ -0,0 +1,60 @@ +package metric + +import ( + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/open-telemetry/opentelemetry-go/api/unit" +) + +type ( + Metric interface { + Measure() core.Measure + + DefinitionID() core.EventID + + Type() MetricType + Fields() []core.Key + Err() error + + base() *baseMetric + } + + MetricType int +) + +const ( + Invalid MetricType = iota + GaugeInt64 + GaugeFloat64 + DerivedGaugeInt64 + DerivedGaugeFloat64 + CumulativeInt64 + CumulativeFloat64 + DerivedCumulativeInt64 + DerivedCumulativeFloat64 +) + +type ( + Option func(*baseMetric, *[]tag.Option) +) + +// WithDescription applies provided description. +func WithDescription(desc string) Option { + return func(_ *baseMetric, to *[]tag.Option) { + *to = append(*to, tag.WithDescription(desc)) + } +} + +// WithUnit applies provided unit. +func WithUnit(unit unit.Unit) Option { + return func(_ *baseMetric, to *[]tag.Option) { + *to = append(*to, tag.WithUnit(unit)) + } +} + +// WithKeys applies the provided dimension keys. +func WithKeys(keys ...core.Key) Option { + return func(bm *baseMetric, _ *[]tag.Option) { + bm.keys = keys + } +} diff --git a/api/metric/common.go b/api/metric/common.go new file mode 100644 index 000000000..f43484e61 --- /dev/null +++ b/api/metric/common.go @@ -0,0 +1,74 @@ +package metric + +import ( + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/scope" + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +type ( + baseMetric struct { + measure core.Measure + + mtype MetricType + keys []core.Key + eventID core.EventID + status error // Indicates registry conflict + } + + baseEntry struct { + base *baseMetric + metric Metric + eventID core.EventID + } +) + +func initBaseMetric(name string, mtype MetricType, opts []Option, init Metric) Metric { + var tagOpts []tag.Option + bm := init.base() + + for _, opt := range opts { + opt(bm, &tagOpts) + } + + bm.measure = tag.NewMeasure(name, tagOpts...) + bm.mtype = mtype + + bm.eventID = observer.Record(observer.Event{ + Type: observer.NEW_METRIC, + Scope: bm.measure.DefinitionID().Scope(), + }) + + other, err := GetRegistry().RegisterMetric(init) + if err != nil { + bm.status = err + } + return other +} + +func (bm *baseMetric) base() *baseMetric { + return bm +} + +func (bm *baseMetric) Measure() core.Measure { + return bm.measure +} + +func (bm *baseMetric) Type() MetricType { + return bm.mtype +} + +func (bm *baseMetric) Fields() []core.Key { + return bm.keys +} + +func (bm *baseMetric) Err() error { + return bm.status +} + +func (e *baseEntry) init(m Metric, values []core.KeyValue) { + e.base = m.base() + e.metric = m + e.eventID = scope.New(core.ScopeID{}, values...).ScopeID().EventID +} diff --git a/api/metric/gauge.go b/api/metric/gauge.go new file mode 100644 index 000000000..30f1fd21f --- /dev/null +++ b/api/metric/gauge.go @@ -0,0 +1,39 @@ +package metric + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/stats" +) + +type ( + Float64Gauge struct { + baseMetric + } + + Float64Entry struct { + baseEntry + } +) + +func NewFloat64Gauge(name string, mos ...Option) *Float64Gauge { + m := initBaseMetric(name, GaugeFloat64, mos, &Float64Gauge{}).(*Float64Gauge) + return m +} + +func (g *Float64Gauge) Gauge(values ...core.KeyValue) Float64Entry { + var entry Float64Entry + entry.init(g, values) + return entry +} + +func (g *Float64Gauge) DefinitionID() core.EventID { + return g.eventID +} + +func (g Float64Entry) Set(ctx context.Context, val float64) { + stats.Record(ctx, g.base.measure.M(val).With(core.ScopeID{ + EventID: g.eventID, + })) +} diff --git a/api/metric/registry.go b/api/metric/registry.go new file mode 100644 index 000000000..5726e4c97 --- /dev/null +++ b/api/metric/registry.go @@ -0,0 +1,78 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "errors" + "sync" +) + +type ( + // Registry is a mechanism for avoiding duplicate registration + // of different-type pre-aggregated metrics (in one process). + Registry interface { + RegisterMetric(Metric) (Metric, error) + ForeachMetric(func(string, Metric)) + } + + registry struct { + nameType sync.Map // map[string]Metric + } +) + +var ( + registryLock sync.Mutex + registryGlobal Registry = ®istry{} + + errDuplicateMetricTypeConflict = errors.New("Duplicate metric registration with conflicting type") +) + +// SetRegistry may be used to reset the global metric registry, which should not be +// needed unless for testing purposes. +func SetRegistry(r Registry) { + registryLock.Lock() + defer registryLock.Unlock() + registryGlobal = r +} + +// GetRegistry may be used to access a global list of metric definitions. +func GetRegistry() Registry { + registryLock.Lock() + defer registryLock.Unlock() + return registryGlobal +} + +func (r *registry) RegisterMetric(newMet Metric) (Metric, error) { + name := newMet.Measure().Name() + has, ok := r.nameType.Load(name) + + if ok { + m := has.(Metric) + if m.Type() != newMet.Type() { + return nil, errDuplicateMetricTypeConflict + } + return m, nil + } + + r.nameType.Store(name, newMet) + return newMet, nil +} + +func (r *registry) ForeachMetric(f func(string, Metric)) { + r.nameType.Range(func(key, value interface{}) bool { + f(key.(string), value.(Metric)) + return true + }) +} diff --git a/api/scope/scope.go b/api/scope/scope.go new file mode 100644 index 000000000..ab907a4bb --- /dev/null +++ b/api/scope/scope.go @@ -0,0 +1,67 @@ +package scope + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +type ( + Scope interface { + ScopeID() core.ScopeID + } + + Mutable interface { + Scope + + SetAttribute(core.KeyValue) + SetAttributes(...core.KeyValue) + + ModifyAttribute(core.Mutator) + ModifyAttributes(...core.Mutator) + } + + scopeIdent struct { + id core.ScopeID + } + + scopeKeyType struct{} +) + +var ( + scopeKey = &scopeKeyType{} + emptyScope = &scopeIdent{} +) + +func SetActive(ctx context.Context, scope Scope) context.Context { + return context.WithValue(ctx, scopeKey, scope) +} + +func Active(ctx context.Context) Scope { + if scope, has := ctx.Value(scopeKey).(Scope); has { + return scope + } + return emptyScope +} + +func (s *scopeIdent) ScopeID() core.ScopeID { + if s == nil { + return core.ScopeID{} + } + return s.id +} + +func New(parent core.ScopeID, attributes ...core.KeyValue) Scope { + eventID := observer.Record(observer.Event{ + Type: observer.NEW_SCOPE, + Scope: parent, + Attributes: attributes, + }) + return &scopeIdent{ + id: core.ScopeID{ + EventID: eventID, + SpanContext: parent.SpanContext, + }, + } +} diff --git a/api/stats/stats.go b/api/stats/stats.go new file mode 100644 index 000000000..8445c1a20 --- /dev/null +++ b/api/stats/stats.go @@ -0,0 +1,50 @@ +package stats + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/scope" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +type ( + Interface interface { + Record(ctx context.Context, m ...core.Measurement) + RecordSingle(ctx context.Context, m core.Measurement) + } + + Recorder struct { + core.ScopeID + } +) + +func With(scope scope.Scope) Recorder { + return Recorder{scope.ScopeID()} +} + +func Record(ctx context.Context, m ...core.Measurement) { + With(scope.Active(ctx)).Record(ctx, m...) +} + +func RecordSingle(ctx context.Context, m core.Measurement) { + With(scope.Active(ctx)).RecordSingle(ctx, m) +} + +func (r Recorder) Record(ctx context.Context, m ...core.Measurement) { + observer.Record(observer.Event{ + Type: observer.RECORD_STATS, + Scope: r.ScopeID, + Context: ctx, + Stats: m, + }) +} + +func (r Recorder) RecordSingle(ctx context.Context, m core.Measurement) { + observer.Record(observer.Event{ + Type: observer.RECORD_STATS, + Scope: r.ScopeID, + Context: ctx, + Stat: m, + }) +} diff --git a/api/tag/api.go b/api/tag/api.go new file mode 100644 index 000000000..9021818fd --- /dev/null +++ b/api/tag/api.go @@ -0,0 +1,99 @@ +package tag + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/unit" +) + +type ( + Map interface { + // TODO combine these four into a struct + Apply(a1 core.KeyValue, attributes []core.KeyValue, m1 core.Mutator, mutators []core.Mutator) Map + + Value(core.Key) (core.Value, bool) + HasValue(core.Key) bool + + Len() int + + Foreach(func(kv core.KeyValue) bool) + } + + Option func(*registeredKey) +) + +var ( + EmptyMap = NewMap(core.KeyValue{}, nil, core.Mutator{}, nil) +) + +func New(name string, opts ...Option) core.Key { // TODO rename NewKey? + return register(name, opts) +} + +func NewMeasure(name string, opts ...Option) core.Measure { + return measure{ + rk: register(name, opts), + } +} + +func NewMap(a1 core.KeyValue, attributes []core.KeyValue, m1 core.Mutator, mutators []core.Mutator) Map { + var t tagMap + return t.Apply(a1, attributes, m1, mutators) +} + +func (input tagMap) Apply(a1 core.KeyValue, attributes []core.KeyValue, m1 core.Mutator, mutators []core.Mutator) Map { + m := make(tagMap, len(input)+len(attributes)+len(mutators)) + for k, v := range input { + m[k] = v + } + if a1.Key != nil { + m[a1.Key] = tagContent{ + value: a1.Value, + } + } + for _, kv := range attributes { + m[kv.Key] = tagContent{ + value: kv.Value, + } + } + if m1.KeyValue.Key != nil { + m.apply(m1) + } + for _, mutator := range mutators { + m.apply(mutator) + } + return m +} + +func WithMap(ctx context.Context, m Map) context.Context { + return context.WithValue(ctx, ctxTagsKey, m) +} + +func NewContext(ctx context.Context, mutators ...core.Mutator) context.Context { + return WithMap(ctx, FromContext(ctx).Apply( + core.KeyValue{}, nil, + core.Mutator{}, mutators, + )) +} + +func FromContext(ctx context.Context) Map { + if m, ok := ctx.Value(ctxTagsKey).(Map); ok { + return m + } + return tagMap{} +} + +// WithDescription applies provided description. +func WithDescription(desc string) Option { + return func(rk *registeredKey) { + rk.desc = desc + } +} + +// WithUnit applies provided unit. +func WithUnit(unit unit.Unit) Option { + return func(rk *registeredKey) { + rk.unit = unit + } +} diff --git a/api/tag/map.go b/api/tag/map.go new file mode 100644 index 000000000..2d885f6b2 --- /dev/null +++ b/api/tag/map.go @@ -0,0 +1,111 @@ +package tag + +import ( + "context" + "runtime/pprof" + + "github.com/open-telemetry/opentelemetry-go/api/core" +) + +type ( + tagMap map[core.Key]tagContent + + tagContent struct { + value core.Value + meta core.MeasureMetadata + } +) + +func (m tagMap) HasValue(k core.Key) bool { + _, has := m.Value(k) + return has +} + +func (m tagMap) Value(k core.Key) (core.Value, bool) { + entry, ok := m[k] + if !ok { + entry.value.Type = core.INVALID + } + return entry.value, ok +} + +func (m tagMap) apply(mutator core.Mutator) { + if m == nil { + return + } + key := mutator.KeyValue.Key + content := tagContent{ + value: mutator.KeyValue.Value, + meta: mutator.MeasureMetadata, + } + switch mutator.MutatorOp { + case core.INSERT: + if _, ok := m[key]; !ok { + m[key] = content + } + case core.UPDATE: + if _, ok := m[key]; ok { + m[key] = content + } + case core.UPSERT: + m[key] = content + case core.DELETE: + delete(m, key) + } +} + +func Insert(kv core.KeyValue) core.Mutator { + return core.Mutator{ + MutatorOp: core.INSERT, + KeyValue: kv, + } +} + +func Update(kv core.KeyValue) core.Mutator { + return core.Mutator{ + MutatorOp: core.UPDATE, + KeyValue: kv, + } +} + +func Upsert(kv core.KeyValue) core.Mutator { + return core.Mutator{ + MutatorOp: core.UPSERT, + KeyValue: kv, + } +} + +func Delete(k core.Key) core.Mutator { + return core.Mutator{ + MutatorOp: core.DELETE, + KeyValue: core.KeyValue{ + Key: k, + }, + } +} + +// Note: the golang pprof.Do API forces this memory allocation, we +// should file an issue about that. (There's a TODO in the source.) +func Do(ctx context.Context, f func(ctx context.Context)) { + m := FromContext(ctx).(tagMap) + keyvals := make([]string, 0, 2*len(m)) + for k, v := range m { + keyvals = append(keyvals, k.Name(), v.value.Emit()) + } + pprof.Do(ctx, pprof.Labels(keyvals...), f) +} + +func (m tagMap) Foreach(f func(kv core.KeyValue) bool) { + for k, v := range m { + if !f(core.KeyValue{ + Key: k, + Value: v.value, + }) { + return + } + } +} + +func (m tagMap) Len() int { + return len(m) +} diff --git a/api/tag/tag.go b/api/tag/tag.go new file mode 100644 index 000000000..16b03de64 --- /dev/null +++ b/api/tag/tag.go @@ -0,0 +1,200 @@ +package tag + +import ( + "context" + "unsafe" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/unit" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +type ( + registeredKey struct { + name string + desc string + unit unit.Unit + eventID core.EventID + } + + ctxTagsType struct{} + + measure struct { + rk *registeredKey + } +) + +var ( + ctxTagsKey = &ctxTagsType{} +) + +func register(name string, opts []Option) *registeredKey { + rk := ®isteredKey{ + name: name, + } + for _, of := range opts { + of(rk) + } + rk.eventID = observer.Record(observer.Event{ + Type: observer.NEW_MEASURE, + String: name, + // TODO desc, unit + }) + return rk +} + +func (k *registeredKey) Name() string { + if k == nil { + return "unregistered" + } + return k.name +} + +func (k *registeredKey) Description() string { + if k == nil { + return "" + } + return k.desc +} + +func (k *registeredKey) Unit() unit.Unit { + if k == nil { + return unit.Dimensionless + } + return k.unit +} + +func (k *registeredKey) DefinitionID() core.EventID { + if k == nil { + return 0 + } + return k.eventID +} + +func (k *registeredKey) Bool(v bool) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.BOOL, + Bool: v, + }, + } +} + +func (k *registeredKey) Int64(v int64) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.INT64, + Int64: v, + }, + } +} + +func (k *registeredKey) Uint64(v uint64) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.UINT64, + Uint64: v, + }, + } +} + +func (k *registeredKey) Float64(v float64) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.FLOAT64, + Float64: v, + }, + } +} + +func (k *registeredKey) Int32(v int32) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.INT32, + Int64: int64(v), + }, + } +} + +func (k *registeredKey) Uint32(v uint32) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.UINT32, + Uint64: uint64(v), + }, + } +} + +func (k *registeredKey) Float32(v float32) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.FLOAT32, + Float64: float64(v), + }, + } +} + +func (k *registeredKey) String(v string) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.STRING, + String: v, + }, + } +} + +func (k *registeredKey) Bytes(v []byte) core.KeyValue { + return core.KeyValue{ + Key: k, + Value: core.Value{ + Type: core.BYTES, + Bytes: v, + }, + } +} + +func (k *registeredKey) Int(v int) core.KeyValue { + if unsafe.Sizeof(v) == 4 { + return k.Int32(int32(v)) + } + return k.Int64(int64(v)) +} + +func (k *registeredKey) Uint(v uint) core.KeyValue { + if unsafe.Sizeof(v) == 4 { + return k.Uint32(uint32(v)) + } + return k.Uint64(uint64(v)) +} + +func (k *registeredKey) Value(ctx context.Context) core.KeyValue { + v, _ := FromContext(ctx).Value(k) + return core.KeyValue{ + Key: k, + Value: v, + } +} + +func (m measure) M(v float64) core.Measurement { + return core.Measurement{ + Measure: m, + Value: v, + } +} + +func (m measure) V(v float64) core.KeyValue { + return m.rk.Float64(v) +} + +func (m measure) Name() string { return m.rk.Name() } +func (m measure) Description() string { return m.rk.Description() } +func (m measure) Unit() unit.Unit { return m.rk.Unit() } +func (m measure) DefinitionID() core.EventID { return m.rk.DefinitionID() } diff --git a/api/trace/api.go b/api/trace/api.go new file mode 100644 index 000000000..0df0bc4f5 --- /dev/null +++ b/api/trace/api.go @@ -0,0 +1,144 @@ +package trace + +import ( + "context" + "time" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/log" + "github.com/open-telemetry/opentelemetry-go/api/scope" + "github.com/open-telemetry/opentelemetry-go/api/stats" + "github.com/open-telemetry/opentelemetry-go/api/tag" +) + +type ( + Tracer interface { + Start(context.Context, string, ...Option) (context.Context, Span) + + WithSpan( + ctx context.Context, + operation string, + body func(ctx context.Context) error, + ) error + + WithService(name string) Tracer + WithComponent(name string) Tracer + WithResources(res ...core.KeyValue) Tracer + + // Note: see https://github.com/opentracing/opentracing-go/issues/127 + Inject(context.Context, Span, Injector) + + // ScopeID returns the resource scope of this tracer. + scope.Scope + } + + Span interface { + scope.Mutable + + log.Interface + + stats.Interface + + SetError(bool) + + Tracer() Tracer + + Finish() + } + + Injector interface { + Inject(core.SpanContext, tag.Map) + } + + Option struct { + attribute core.KeyValue + attributes []core.KeyValue + startTime time.Time + reference Reference + } + + Reference struct { + core.SpanContext + RelationshipType + } + + RelationshipType int +) + +const ( + ChildOfRelationship RelationshipType = iota + FollowsFromRelationship +) + +func GlobalTracer() Tracer { + if t := global.Load(); t != nil { + return t.(Tracer) + } + return empty +} + +func SetGlobalTracer(t Tracer) { + global.Store(t) +} + +func Start(ctx context.Context, name string, opts ...Option) (context.Context, Span) { + return GlobalTracer().Start(ctx, name, opts...) +} + +func Active(ctx context.Context) Span { + span, _ := scope.Active(ctx).(*span) + return span +} + +func WithSpan(ctx context.Context, name string, body func(context.Context) error) error { + return GlobalTracer().WithSpan(ctx, name, body) +} + +func SetError(ctx context.Context, v bool) { + Active(ctx).SetError(v) +} + +func Inject(ctx context.Context, injector Injector) { + span := Active(ctx) + if span == nil { + return + } + + span.Tracer().Inject(ctx, span, injector) +} + +func WithStartTime(t time.Time) Option { + return Option{ + startTime: t, + } +} + +func WithAttributes(attrs ...core.KeyValue) Option { + return Option{ + attributes: attrs, + } +} + +func WithAttribute(attr core.KeyValue) Option { + return Option{ + attribute: attr, + } +} + +func ChildOf(sc core.SpanContext) Option { + return Option{ + reference: Reference{ + SpanContext: sc, + RelationshipType: ChildOfRelationship, + }, + } +} + +func FollowsFrom(sc core.SpanContext) Option { + return Option{ + reference: Reference{ + SpanContext: sc, + RelationshipType: FollowsFromRelationship, + }, + } +} diff --git a/api/trace/span.go b/api/trace/span.go new file mode 100644 index 000000000..19b3894e4 --- /dev/null +++ b/api/trace/span.go @@ -0,0 +1,138 @@ +package trace + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/log" + "github.com/open-telemetry/opentelemetry-go/api/stats" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +func (sp *span) ScopeID() core.ScopeID { + if sp == nil { + return core.ScopeID{} + } + sp.lock.Lock() + sid := core.ScopeID{ + EventID: sp.eventID, + SpanContext: sp.spanContext, + } + sp.lock.Unlock() + return sid +} + +func (sp *span) updateScope() (core.ScopeID, core.EventID) { + next := observer.NextEventID() + + sp.lock.Lock() + sid := core.ScopeID{ + EventID: sp.eventID, + SpanContext: sp.spanContext, + } + sp.eventID = next + sp.lock.Unlock() + + return sid, next +} + +func (sp *span) SetError(v bool) { + sp.SetAttribute(ErrorKey.Bool(v)) +} + +func (sp *span) SetAttribute(attribute core.KeyValue) { + if sp == nil { + return + } + + sid, next := sp.updateScope() + + observer.Record(observer.Event{ + Type: observer.MODIFY_ATTR, + Scope: sid, + Sequence: next, + Attribute: attribute, + }) +} + +func (sp *span) SetAttributes(attributes ...core.KeyValue) { + if sp == nil { + return + } + + sid, next := sp.updateScope() + + observer.Record(observer.Event{ + Type: observer.MODIFY_ATTR, + Scope: sid, + Sequence: next, + Attributes: attributes, + }) +} + +func (sp *span) ModifyAttribute(mutator core.Mutator) { + if sp == nil { + return + } + + sid, next := sp.updateScope() + + observer.Record(observer.Event{ + Type: observer.MODIFY_ATTR, + Scope: sid, + Sequence: next, + Mutator: mutator, + }) +} + +func (sp *span) ModifyAttributes(mutators ...core.Mutator) { + if sp == nil { + return + } + + sid, next := sp.updateScope() + + observer.Record(observer.Event{ + Type: observer.MODIFY_ATTR, + Scope: sid, + Sequence: next, + Mutators: mutators, + }) +} + +func (sp *span) Finish() { + if sp == nil { + return + } + recovered := recover() + sp.finishOnce.Do(func() { + observer.Record(observer.Event{ + Type: observer.FINISH_SPAN, + Scope: sp.ScopeID(), + Recovered: recovered, + }) + }) + if recovered != nil { + panic(recovered) + } +} + +func (sp *span) Tracer() Tracer { + return sp.tracer +} + +func (sp *span) Log(ctx context.Context, msg string, args ...core.KeyValue) { + log.With(sp).Log(ctx, msg, args...) +} + +func (sp *span) Logf(ctx context.Context, fmt string, args ...interface{}) { + log.With(sp).Logf(ctx, fmt, args...) +} + +func (sp *span) Record(ctx context.Context, m ...core.Measurement) { + stats.With(sp).Record(ctx, m...) +} + +func (sp *span) RecordSingle(ctx context.Context, m core.Measurement) { + stats.With(sp).RecordSingle(ctx, m) +} diff --git a/api/trace/trace.go b/api/trace/trace.go new file mode 100644 index 000000000..12bfa6638 --- /dev/null +++ b/api/trace/trace.go @@ -0,0 +1,145 @@ +package trace + +import ( + "context" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/log" + "github.com/open-telemetry/opentelemetry-go/api/scope" + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +type ( + span struct { + tracer *tracer + spanContext core.SpanContext + lock sync.Mutex + eventID core.EventID + finishOnce sync.Once + } + + tracer struct { + resources core.EventID + } +) + +var ( + ServiceKey = tag.New("service") + ComponentKey = tag.New("component") + ErrorKey = tag.New("error") + SpanIDKey = tag.New("span_id") + TraceIDKey = tag.New("trace_id") + ParentSpanIDKey = tag.New("parent_span_id") + MessageKey = tag.New("message", + tag.WithDescription("message text: info, error, etc"), + ) + + // The process global tracer could have process-wide resource + // tags applied directly, or we can have a SetGlobal tracer to + // install a default tracer w/ resources. + global atomic.Value + empty = &tracer{} +) + +func (t *tracer) ScopeID() core.ScopeID { + return t.resources.Scope() +} + +func (t *tracer) WithResources(attributes ...core.KeyValue) Tracer { + s := scope.New(t.resources.Scope(), attributes...) + return &tracer{ + resources: s.ScopeID().EventID, + } +} + +func (g *tracer) WithComponent(name string) Tracer { + return g.WithResources(ComponentKey.String(name)) +} + +func (g *tracer) WithService(name string) Tracer { + return g.WithResources(ServiceKey.String(name)) +} + +func (t *tracer) WithSpan(ctx context.Context, name string, body func(context.Context) error) error { + // TODO: use runtime/trace.WithRegion for execution tracer support + // TODO: use runtime/pprof.Do for profile tags support + ctx, span := t.Start(ctx, name) + defer span.Finish() + + if err := body(ctx); err != nil { + span.SetAttribute(ErrorKey.Bool(true)) + log.Log(ctx, "span error", MessageKey.String(err.Error())) + return err + } + return nil +} + +func (t *tracer) Start(ctx context.Context, name string, opts ...Option) (context.Context, Span) { + var child core.SpanContext + + child.SpanID = rand.Uint64() + + var startTime time.Time + var attributes []core.KeyValue + var reference Reference + + for _, opt := range opts { + if !opt.startTime.IsZero() { + startTime = opt.startTime + } + if len(opt.attributes) != 0 { + attributes = append(opt.attributes, attributes...) + } + if opt.attribute.Key != nil { + attributes = append(attributes, opt.attribute) + } + if opt.reference.HasTraceID() { + reference = opt.reference + } + } + + var parentScope core.ScopeID + + if reference.HasTraceID() { + parentScope = reference.Scope() + } else { + parentScope = Active(ctx).ScopeID() + } + + if parentScope.HasTraceID() { + parent := parentScope.SpanContext + child.TraceIDHigh = parent.TraceIDHigh + child.TraceIDLow = parent.TraceIDLow + } else { + child.TraceIDHigh = rand.Uint64() + child.TraceIDLow = rand.Uint64() + } + + childScope := core.ScopeID{ + SpanContext: child, + EventID: t.resources, + } + + span := &span{ + spanContext: child, + tracer: t, + eventID: observer.Record(observer.Event{ + Time: startTime, + Type: observer.START_SPAN, + Scope: scope.New(childScope, attributes...).ScopeID(), + Context: ctx, + Parent: parentScope, + String: name, + }), + } + return scope.SetActive(ctx, span), span +} + +func (t *tracer) Inject(ctx context.Context, span Span, injector Injector) { + injector.Inject(span.ScopeID().SpanContext, tag.FromContext(ctx)) +} diff --git a/api/unit/unit.go b/api/unit/unit.go new file mode 100644 index 000000000..694663910 --- /dev/null +++ b/api/unit/unit.go @@ -0,0 +1,11 @@ +package unit + +type ( + Unit string +) + +const ( + Dimensionless Unit = "1" + Bytes Unit = "By" + Milliseconds Unit = "ms" +) diff --git a/example/example.go b/example/example.go new file mode 100644 index 000000000..993f54aea --- /dev/null +++ b/example/example.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-go/api/log" + "github.com/open-telemetry/opentelemetry-go/api/metric" + "github.com/open-telemetry/opentelemetry-go/api/stats" + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/open-telemetry/opentelemetry-go/api/trace" + + "github.com/open-telemetry/opentelemetry-go/exporter/loader" +) + +var ( + tracer = trace.GlobalTracer(). + WithComponent("example"). + WithResources( + tag.New("whatevs").String("yesss"), + ) + + fooKey = tag.New("ex.com/foo", tag.WithDescription("A Foo var")) + barKey = tag.New("ex.com/bar", tag.WithDescription("A Bar var")) + lemonsKey = tag.New("ex.com/lemons", tag.WithDescription("A Lemons var")) + anotherKey = tag.New("ex.com/another") + + oneMetric = metric.NewFloat64Gauge("ex.com/one", + metric.WithKeys(fooKey, barKey, lemonsKey), + metric.WithDescription("A gauge set to 1.0"), + ) + + measureTwo = tag.NewMeasure("ex.com/two") +) + +func main() { + ctx := context.Background() + + ctx = tag.NewContext(ctx, + tag.Insert(fooKey.String("foo1")), + tag.Insert(barKey.String("bar1")), + ) + + gauge := oneMetric.Gauge( + fooKey.Value(ctx), + barKey.Value(ctx), + lemonsKey.Int(10), + ) + + err := tracer.WithSpan(ctx, "operation", func(ctx context.Context) error { + + trace.SetError(ctx, true) + + log.Log(ctx, "Nice operation!", tag.New("bogons").Int(100)) + + trace.Active(ctx).SetAttributes(anotherKey.String("yes")) + + gauge.Set(ctx, 1) + + return tracer.WithSpan( + ctx, + "Sub operation...", + func(ctx context.Context) error { + trace.Active(ctx).SetAttribute(lemonsKey.String("five")) + + log.Logf(ctx, "Format schmormat %d!", 100) + + stats.Record(ctx, measureTwo.M(1.3)) + + return nil + }, + ) + }) + if err != nil { + panic(err) + } + + loader.Flush() +} diff --git a/example/http/client/client.go b/example/http/client/client.go new file mode 100644 index 000000000..7bcc1c4de --- /dev/null +++ b/example/http/client/client.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/open-telemetry/opentelemetry-go/api/trace" + "github.com/open-telemetry/opentelemetry-go/plugin/httptrace" + + _ "github.com/open-telemetry/opentelemetry-go/exporter/loader" +) + +var ( + tracer = trace.GlobalTracer(). + WithService("client"). + WithComponent("main"). + WithResources( + tag.New("whatevs").String("yesss"), + ) +) + +func main() { + client := http.DefaultClient + + ctx := tag.NewContext(context.Background(), + tag.Insert(tag.New("username").String("donuts")), + ) + + var body []byte + + err := tracer.WithSpan(ctx, "say hello", + func(ctx context.Context) error { + req, _ := http.NewRequest("GET", "http://localhost:7777/hello", nil) + + ctx, req, inj := httptrace.W3C(ctx, req) + + trace.Inject(ctx, inj) + + res, err := client.Do(req) + if err != nil { + panic(err) + } + body, err = ioutil.ReadAll(res.Body) + res.Body.Close() + + return err + }) + + if err != nil { + panic(err) + } + + fmt.Printf("%s", body) +} diff --git a/example/http/server/modd.conf b/example/http/server/modd.conf new file mode 100644 index 000000000..da54262c7 --- /dev/null +++ b/example/http/server/modd.conf @@ -0,0 +1,7 @@ +# A basic modd.conf file for Go development. + +# Run go test on ALL modules on startup, and subsequently only on modules +# containing changes. +server.go { + daemon +sigterm: go run server.go +} \ No newline at end of file diff --git a/example/http/server/server.go b/example/http/server/server.go new file mode 100644 index 000000000..2f84585e5 --- /dev/null +++ b/example/http/server/server.go @@ -0,0 +1,49 @@ +package main + +import ( + "io" + "net/http" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/log" + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/open-telemetry/opentelemetry-go/api/trace" + "github.com/open-telemetry/opentelemetry-go/plugin/httptrace" + + _ "github.com/open-telemetry/opentelemetry-go/exporter/loader" +) + +var ( + tracer = trace.GlobalTracer(). + WithService("server"). + WithComponent("main"). + WithResources( + tag.New("whatevs").String("nooooo"), + ) +) + +func main() { + helloHandler := func(w http.ResponseWriter, req *http.Request) { + attrs, tags, spanCtx := httptrace.Extract(req) + + req = req.WithContext(tag.WithMap(req.Context(), tag.NewMap(core.KeyValue{}, tags, core.Mutator{}, nil))) + + ctx, span := tracer.Start( + req.Context(), + "hello", + trace.WithAttributes(attrs...), + trace.ChildOf(spanCtx), + ) + defer span.Finish() + + log.Log(ctx, "handling this...") + + io.WriteString(w, "Hello, world!\n") + } + + http.HandleFunc("/hello", helloHandler) + err := http.ListenAndServe(":7777", nil) + if err != nil { + panic(err) + } +} diff --git a/exporter/buffer/buffer.go b/exporter/buffer/buffer.go new file mode 100644 index 000000000..4b4c94e50 --- /dev/null +++ b/exporter/buffer/buffer.go @@ -0,0 +1,62 @@ +package buffer + +import ( + "sync" + "sync/atomic" + + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +type ( + Buffer struct { + observers []observer.Observer + events chan observer.Event + dropped uint64 + wait sync.WaitGroup + close chan struct{} + } +) + +func NewBuffer(size int, observers ...observer.Observer) *Buffer { + b := &Buffer{ + observers: observers, + events: make(chan observer.Event, size), + close: make(chan struct{}), + } + b.wait.Add(1) + go b.run() + return b +} + +func (b *Buffer) Observe(data observer.Event) { + select { + case b.events <- data: + default: + atomic.AddUint64(&b.dropped, 1) + } +} + +func (b *Buffer) Close() { + close(b.close) + b.wait.Wait() +} + +func (b *Buffer) run() { + defer func() { + _ = recover() + b.wait.Done() + }() + + for { + select { + case <-b.close: + return + case ev := <-b.events: + // TODO: This has to ensure ordered arrival, + // e.g., put into a heap and delay observations. + for _, obs := range b.observers { + obs.Observe(ev) + } + } + } +} diff --git a/exporter/loader/loader.go b/exporter/loader/loader.go new file mode 100644 index 000000000..6e3cd0513 --- /dev/null +++ b/exporter/loader/loader.go @@ -0,0 +1,42 @@ +package loader + +import ( + "fmt" + "os" + "plugin" + "time" + + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +// TODO add buffer support directly, eliminate stdout + +func init() { + pluginName := os.Getenv("OPENTELEMETRY_LIB") + if pluginName == "" { + return + } + sharedObj, err := plugin.Open(pluginName) + if err != nil { + fmt.Println("Open failed", pluginName, err) + return + } + + obsPlugin, err := sharedObj.Lookup("Observer") + if err != nil { + fmt.Println("Observer not found", pluginName, err) + return + } + + obs, ok := obsPlugin.(*observer.Observer) + if !ok { + fmt.Printf("Observer not valid\n") + return + } + observer.RegisterObserver(*obs) +} + +func Flush() { + // TODO implement for exporter/{stdout,stderr,buffer} + time.Sleep(1 * time.Second) +} diff --git a/exporter/observer/eventtype_string.go b/exporter/observer/eventtype_string.go new file mode 100644 index 000000000..0a145453f --- /dev/null +++ b/exporter/observer/eventtype_string.go @@ -0,0 +1,32 @@ +// Code generated by "stringer -type=EventType"; DO NOT EDIT. + +package observer + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[INVALID-0] + _ = x[START_SPAN-1] + _ = x[FINISH_SPAN-2] + _ = x[LOG_EVENT-3] + _ = x[LOGF_EVENT-4] + _ = x[NEW_SCOPE-5] + _ = x[NEW_MEASURE-6] + _ = x[NEW_METRIC-7] + _ = x[MODIFY_ATTR-8] + _ = x[RECORD_STATS-9] +} + +const _EventType_name = "INVALIDSTART_SPANFINISH_SPANLOG_EVENTLOGF_EVENTNEW_SCOPENEW_MEASURENEW_METRICMODIFY_ATTRRECORD_STATS" + +var _EventType_index = [...]uint8{0, 7, 17, 28, 37, 47, 56, 67, 77, 88, 100} + +func (i EventType) String() string { + if i < 0 || i >= EventType(len(_EventType_index)-1) { + return "EventType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _EventType_name[_EventType_index[i]:_EventType_index[i+1]] +} diff --git a/exporter/observer/observer.go b/exporter/observer/observer.go new file mode 100644 index 000000000..25f76051a --- /dev/null +++ b/exporter/observer/observer.go @@ -0,0 +1,126 @@ +package observer + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/open-telemetry/opentelemetry-go/api/core" +) + +type ( + EventType int + + Event struct { + // Automatic fields + Sequence core.EventID // Auto-filled + Time time.Time // Auto-filled + + // Type, Scope, Context + Type EventType // All events + Scope core.ScopeID // All events + Context context.Context // core.FromContext() and scope.Active() + + // Arguments (type-specific) + Attribute core.KeyValue // SET_ATTRIBUTE + Attributes []core.KeyValue // SET_ATTRIBUTES, LOG_EVENT + Mutator core.Mutator // SET_ATTRIBUTE + Mutators []core.Mutator // SET_ATTRIBUTES + Arguments []interface{} // LOGF_EVENT + Recovered interface{} // FINISH_SPAN + + // Values + String string // START_SPAN, EVENT, ... + Float64 float64 + Parent core.ScopeID // START_SPAN + Stats []core.Measurement + Stat core.Measurement + } + + Observer interface { + Observe(data Event) + } + + observersMap map[Observer]struct{} +) + +//go:generate stringer -type=EventType +const ( + // TODO: rename these NOUN_VERB + INVALID EventType = iota + START_SPAN + FINISH_SPAN + LOG_EVENT + LOGF_EVENT + NEW_SCOPE + NEW_MEASURE + NEW_METRIC + MODIFY_ATTR + RECORD_STATS +) + +var ( + observerMu sync.Mutex + observers atomic.Value + + sequenceNum uint64 +) + +func NextEventID() core.EventID { + return core.EventID(atomic.AddUint64(&sequenceNum, 1)) +} + +// RegisterObserver adds to the list of Observers that will receive sampled +// trace spans. +// +// Binaries can register observers, libraries shouldn't register observers. +func RegisterObserver(e Observer) { + observerMu.Lock() + new := make(observersMap) + if old, ok := observers.Load().(observersMap); ok { + for k, v := range old { + new[k] = v + } + } + new[e] = struct{}{} + observers.Store(new) + observerMu.Unlock() +} + +// UnregisterObserver removes from the list of Observers the Observer that was +// registered with the given name. +func UnregisterObserver(e Observer) { + observerMu.Lock() + new := make(observersMap) + if old, ok := observers.Load().(observersMap); ok { + for k, v := range old { + new[k] = v + } + } + delete(new, e) + observers.Store(new) + observerMu.Unlock() +} + +func Record(event Event) core.EventID { + if event.Sequence == 0 { + event.Sequence = NextEventID() + } + if event.Time.IsZero() { + event.Time = time.Now() + } + + observers, _ := observers.Load().(observersMap) + for observer, _ := range observers { + observer.Observe(event) + } + return event.Sequence +} + +func Foreach(f func(Observer)) { + observers, _ := observers.Load().(observersMap) + for observer, _ := range observers { + f(observer) + } +} diff --git a/exporter/reader/format/format.go b/exporter/reader/format/format.go new file mode 100644 index 000000000..c2d81a025 --- /dev/null +++ b/exporter/reader/format/format.go @@ -0,0 +1,107 @@ +package format + +import ( + "fmt" + "strings" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/trace" + "github.com/open-telemetry/opentelemetry-go/exporter/reader" +) + +func AppendEvent(buf *strings.Builder, data reader.Event) { + + f := func(skipIf bool) func(kv core.KeyValue) bool { + return func(kv core.KeyValue) bool { + if skipIf && data.Attributes.HasValue(kv.Key) { + return true + } + buf.WriteString(" " + kv.Key.Name() + "=" + kv.Value.Emit()) + return true + } + } + + buf.WriteString(data.Time.Format("2006/01/02 15-04-05.000000")) + buf.WriteString(" ") + + switch data.Type { + case reader.START_SPAN: + buf.WriteString("start ") + buf.WriteString(data.Name) + + if !data.Parent.HasSpanID() { + buf.WriteString(", a root span") + } else { + buf.WriteString(" <") + if data.Parent.HasSpanID() { + f(false)(trace.ParentSpanIDKey.String(data.SpanContext.SpanIDString())) + } + if data.ParentAttributes != nil { + data.ParentAttributes.Foreach(f(false)) + } + buf.WriteString(" >") + } + + case reader.FINISH_SPAN: + buf.WriteString("finish ") + buf.WriteString(data.Name) + + buf.WriteString(" (") + buf.WriteString(data.Duration.String()) + buf.WriteString(")") + + case reader.LOG_EVENT: + buf.WriteString(data.Message) + + case reader.LOGF_EVENT: + buf.WriteString(data.Message) + + case reader.MODIFY_ATTR: + buf.WriteString("modify attr") + case reader.RECORD_STATS: + buf.WriteString("record") + + for _, s := range data.Stats { + f(false)(s.Measure.V(s.Value)) + + buf.WriteString(" {") + i := 0 + s.Tags.Foreach(func(kv core.KeyValue) bool { + if i != 0 { + buf.WriteString(",") + } + i++ + buf.WriteString(kv.Key.Name()) + buf.WriteString("=") + buf.WriteString(kv.Value.Emit()) + return true + }) + buf.WriteString("}") + } + default: + buf.WriteString(fmt.Sprintf("WAT? %d", data.Type)) + } + + // Attach the scope (span) attributes and context tags. + buf.WriteString(" [") + if data.Attributes != nil { + data.Attributes.Foreach(f(false)) + } + if data.Tags != nil { + data.Tags.Foreach(f(true)) + } + if data.SpanContext.HasSpanID() { + f(false)(trace.SpanIDKey.String(data.SpanContext.SpanIDString())) + } + if data.SpanContext.HasTraceID() { + f(false)(trace.TraceIDKey.String(data.SpanContext.TraceIDString())) + } + + buf.WriteString(" ]\n") +} + +func EventToString(data reader.Event) string { + var buf strings.Builder + AppendEvent(&buf, data) + return buf.String() +} diff --git a/exporter/reader/reader.go b/exporter/reader/reader.go new file mode 100644 index 000000000..3218617e0 --- /dev/null +++ b/exporter/reader/reader.go @@ -0,0 +1,332 @@ +package reader + +import ( + "fmt" + "sync" + "time" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/metric" + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/open-telemetry/opentelemetry-go/api/trace" + "github.com/open-telemetry/opentelemetry-go/api/unit" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" +) + +type ( + Reader interface { + Read(Event) + } + + EventType int + + Event struct { + Type EventType + Time time.Time + Sequence core.EventID + SpanContext core.SpanContext + Tags tag.Map + Attributes tag.Map + Stats []Measurement + + Parent core.SpanContext + ParentAttributes tag.Map + + Duration time.Duration + Name string + Message string + } + + Measurement struct { + Measure core.Measure + Value float64 + Tags tag.Map + } + + readerObserver struct { + readers []Reader + + // core.EventID -> *readerSpan or *readerScope + scopes sync.Map + + // core.EventID -> *readerMeasure + measures sync.Map + + // core.EventID -> *readerMetric + metrics sync.Map + } + + readerSpan struct { + name string + start time.Time + startTags tag.Map + spanContext core.SpanContext + + *readerScope + } + + readerMeasure struct { + name string + desc string + unit unit.Unit + } + + readerMetric struct { + *readerMeasure + mtype metric.MetricType + fields []core.Measure + } + + readerScope struct { + span *readerSpan + parent core.EventID + attributes tag.Map + } +) + +const ( + INVALID EventType = iota + START_SPAN + FINISH_SPAN + LOG_EVENT + LOGF_EVENT + MODIFY_ATTR + RECORD_STATS +) + +// NewReaderObserver returns an implementation that computes the +// necessary state needed by a reader to process events in memory. +// Practically, this means tracking live metric handles and scope +// attribute sets. +func NewReaderObserver(readers ...Reader) observer.Observer { + return &readerObserver{ + readers: readers, + } +} + +func (ro *readerObserver) Observe(event observer.Event) { + read := Event{ + Time: event.Time, + Sequence: event.Sequence, + Attributes: tag.EmptyMap, + Tags: tag.EmptyMap, + } + + if event.Context != nil { + read.Tags = tag.FromContext(event.Context) + } + + switch event.Type { + case observer.START_SPAN: + // Save the span context tags, initial attributes, start time, and name. + span := &readerSpan{ + name: event.String, + start: event.Time, + startTags: tag.FromContext(event.Context), + spanContext: event.Scope.SpanContext, + readerScope: &readerScope{}, + } + + rattrs, _ := ro.readScope(event.Scope) + + span.readerScope.span = span + span.readerScope.attributes = rattrs + + read.Name = span.name + read.Type = START_SPAN + read.SpanContext = span.spanContext + read.Attributes = rattrs + + if event.Parent.EventID == 0 && event.Parent.HasTraceID() { + // Remote parent + read.Parent = event.Parent.SpanContext + + // Note: No parent attributes in the event for remote parents. + } else { + pattrs, pspan := ro.readScope(event.Parent) + + if pspan != nil { + // Local parent + read.Parent = pspan.spanContext + read.ParentAttributes = pattrs + } + } + + ro.scopes.Store(event.Sequence, span) + + case observer.FINISH_SPAN: + attrs, span := ro.readScope(event.Scope) + if span == nil { + panic("span not found") + } + + read.Name = span.name + read.Type = FINISH_SPAN + + read.Attributes = attrs + read.Duration = event.Time.Sub(span.start) + read.Tags = span.startTags + read.SpanContext = span.spanContext + + // TODO: recovered + + case observer.NEW_SCOPE, observer.MODIFY_ATTR: + var span *readerSpan + var m tag.Map + var sid core.ScopeID + + if event.Scope.EventID == 0 { + // TODO: This is racey. Do this at the call + // site via Resources. + sid = trace.GlobalTracer().ScopeID() + } else { + sid = event.Scope + } + if sid.EventID == 0 { + m = tag.EmptyMap + } else { + parentI, has := ro.scopes.Load(sid.EventID) + if !has { + panic("parent scope not found") + } + if parent, ok := parentI.(*readerScope); ok { + m = parent.attributes + span = parent.span + } else if parent, ok := parentI.(*readerSpan); ok { + m = parent.attributes + span = parent + } + } + + sc := &readerScope{ + span: span, + parent: sid.EventID, + attributes: m.Apply( + event.Attribute, + event.Attributes, + event.Mutator, + event.Mutators, + ), + } + + ro.scopes.Store(event.Sequence, sc) + + if event.Type == observer.NEW_SCOPE { + return + } + + read.Type = MODIFY_ATTR + read.Attributes = sc.attributes + + if span != nil { + read.SpanContext = span.spanContext + read.Tags = span.startTags + } + + case observer.NEW_MEASURE: + measure := &readerMeasure{ + name: event.String, + } + ro.measures.Store(event.Sequence, measure) + return + + case observer.NEW_METRIC: + measureI, has := ro.measures.Load(event.Scope.EventID) + if !has { + panic("metric measure not found") + } + metric := &readerMetric{ + readerMeasure: measureI.(*readerMeasure), + } + ro.metrics.Store(event.Sequence, metric) + return + + case observer.LOG_EVENT: + read.Type = LOG_EVENT + + read.Message = event.String + + attrs, span := ro.readScope(event.Scope) + read.Attributes = attrs.Apply(core.KeyValue{}, event.Attributes, core.Mutator{}, nil) + if span != nil { + read.SpanContext = span.spanContext + } + + case observer.LOGF_EVENT: + // TODO: this can't be done lazily, must be done before Record() + read.Message = fmt.Sprintf(event.String, event.Arguments...) + + read.Type = LOGF_EVENT + attrs, span := ro.readScope(event.Scope) + read.Attributes = attrs + if span != nil { + read.SpanContext = span.spanContext + } + + case observer.RECORD_STATS: + read.Type = RECORD_STATS + + _, span := ro.readScope(event.Scope) + if span != nil { + read.SpanContext = span.spanContext + } + for _, es := range event.Stats { + ro.addMeasurement(&read, es) + } + if event.Stat.Measure != nil { + ro.addMeasurement(&read, event.Stat) + } + + default: + panic(fmt.Sprint("Unhandled case: ", event.Type)) + } + + for _, reader := range ro.readers { + reader.Read(read) + } + + if event.Type == observer.FINISH_SPAN { + ro.cleanupSpan(event.Scope.EventID) + } +} + +func (ro *readerObserver) addMeasurement(e *Event, m core.Measurement) { + attrs, _ := ro.readScope(m.ScopeID) + e.Stats = append(e.Stats, Measurement{ + Measure: m.Measure, + Value: m.Value, + Tags: attrs, + }) +} + +func (ro *readerObserver) readScope(id core.ScopeID) (tag.Map, *readerSpan) { + if id.EventID == 0 { + return tag.EmptyMap, nil + } + ev, has := ro.scopes.Load(id.EventID) + if !has { + panic(fmt.Sprintln("scope not found", id.EventID)) + } + if sp, ok := ev.(*readerScope); ok { + return sp.attributes, sp.span + } else if sp, ok := ev.(*readerSpan); ok { + return sp.attributes, sp + } + return tag.EmptyMap, nil +} + +func (ro *readerObserver) cleanupSpan(id core.EventID) { + for id != 0 { + ev, has := ro.scopes.Load(id) + if !has { + panic(fmt.Sprintln("scope not found", id)) + } + ro.scopes.Delete(id) + + if sp, ok := ev.(*readerScope); ok { + id = sp.parent + } else if sp, ok := ev.(*readerSpan); ok { + id = sp.parent + } + } +} diff --git a/exporter/spandata/format/format.go b/exporter/spandata/format/format.go new file mode 100644 index 000000000..101751837 --- /dev/null +++ b/exporter/spandata/format/format.go @@ -0,0 +1,20 @@ +package format + +import ( + "strings" + + "github.com/open-telemetry/opentelemetry-go/exporter/reader/format" + "github.com/open-telemetry/opentelemetry-go/exporter/spandata" +) + +func AppendSpan(buf *strings.Builder, data *spandata.Span) { + for _, event := range data.Events { + format.AppendEvent(buf, event) + } +} + +func SpanToString(data *spandata.Span) string { + var buf strings.Builder + AppendSpan(&buf, data) + return buf.String() +} diff --git a/exporter/spandata/spandata.go b/exporter/spandata/spandata.go new file mode 100644 index 000000000..2ac548513 --- /dev/null +++ b/exporter/spandata/spandata.go @@ -0,0 +1,55 @@ +package spandata + +import ( + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" + "github.com/open-telemetry/opentelemetry-go/exporter/reader" +) + +type ( + Reader interface { + Read(*Span) + } + + Span struct { + Events []reader.Event + } + + spanReader struct { + spans map[core.SpanContext]*Span + readers []Reader + } +) + +func NewReaderObserver(readers ...Reader) observer.Observer { + return reader.NewReaderObserver(&spanReader{ + spans: map[core.SpanContext]*Span{}, + readers: readers, + }) +} + +func (s *spanReader) Read(data reader.Event) { + if !data.SpanContext.HasSpanID() { + return + } + var span *Span + if data.Type == reader.START_SPAN { + span = &Span{Events: make([]reader.Event, 0, 4)} + s.spans[data.SpanContext] = span + } else { + span = s.spans[data.SpanContext] + if span == nil { + // TODO count and report this. + return + } + } + + span.Events = append(span.Events, data) + + if data.Type == reader.FINISH_SPAN { + for _, r := range s.readers { + r.Read(span) + } + delete(s.spans, data.SpanContext) + } +} diff --git a/exporter/spanlog/install/package.go b/exporter/spanlog/install/package.go new file mode 100644 index 000000000..8622483a5 --- /dev/null +++ b/exporter/spanlog/install/package.go @@ -0,0 +1,16 @@ +package install + +import ( + "github.com/open-telemetry/opentelemetry-go/exporter/observer" + "github.com/open-telemetry/opentelemetry-go/exporter/spanlog" +) + +// Use this import: +// +// import _ "github.com/open-telemetry/opentelemetry-go/exporter/spanlog/install" +// +// to include the spanlog exporter by default. + +func init() { + observer.RegisterObserver(spanlog.New()) +} diff --git a/exporter/spanlog/plugin/Makefile b/exporter/spanlog/plugin/Makefile new file mode 100644 index 000000000..3e358344a --- /dev/null +++ b/exporter/spanlog/plugin/Makefile @@ -0,0 +1,4 @@ +.PHONY: module + +module: + go build -buildmode=plugin -o spanlog.so package.go diff --git a/exporter/spanlog/plugin/package.go b/exporter/spanlog/plugin/package.go new file mode 100644 index 000000000..1c2c51ee9 --- /dev/null +++ b/exporter/spanlog/plugin/package.go @@ -0,0 +1,10 @@ +package main + +import "github.com/open-telemetry/opentelemetry-go/exporter/spanlog" + +var ( + Observer = spanlog.New() +) + +func main() { +} diff --git a/exporter/spanlog/spanlog.go b/exporter/spanlog/spanlog.go new file mode 100644 index 000000000..7d776c038 --- /dev/null +++ b/exporter/spanlog/spanlog.go @@ -0,0 +1,26 @@ +package spanlog + +import ( + "os" + "strings" + + "github.com/open-telemetry/opentelemetry-go/exporter/buffer" + "github.com/open-telemetry/opentelemetry-go/exporter/observer" + "github.com/open-telemetry/opentelemetry-go/exporter/spandata" + "github.com/open-telemetry/opentelemetry-go/exporter/spandata/format" +) + +type ( + spanLog struct{} +) + +func New() observer.Observer { + return buffer.NewBuffer(1000, spandata.NewReaderObserver(&spanLog{})) +} + +func (s *spanLog) Read(data *spandata.Span) { + var buf strings.Builder + buf.WriteString("----------------------------------------------------------------------\n") + format.AppendSpan(&buf, data) + os.Stdout.WriteString(buf.String()) +} diff --git a/exporter/stderr/install/package.go b/exporter/stderr/install/package.go new file mode 100644 index 000000000..50d08137d --- /dev/null +++ b/exporter/stderr/install/package.go @@ -0,0 +1,16 @@ +package install + +import ( + "github.com/open-telemetry/opentelemetry-go/exporter/observer" + "github.com/open-telemetry/opentelemetry-go/exporter/stderr" +) + +// Use this import: +// +// import _ "github.com/open-telemetry/opentelemetry-go/exporter/stderr/install" +// +// to include the stderr exporter by default. + +func init() { + observer.RegisterObserver(stderr.New()) +} diff --git a/exporter/stderr/plugin/Makefile b/exporter/stderr/plugin/Makefile new file mode 100644 index 000000000..8091a854e --- /dev/null +++ b/exporter/stderr/plugin/Makefile @@ -0,0 +1,4 @@ +.PHONY: module + +module: + go build -buildmode=plugin -o stderr.so package.go diff --git a/exporter/stderr/plugin/package.go b/exporter/stderr/plugin/package.go new file mode 100644 index 000000000..b3bd041cf --- /dev/null +++ b/exporter/stderr/plugin/package.go @@ -0,0 +1,10 @@ +package main + +import "github.com/open-telemetry/opentelemetry-go/exporter/stderr" + +var ( + Observer = stderr.New() +) + +func main() { +} diff --git a/exporter/stderr/stderr.go b/exporter/stderr/stderr.go new file mode 100644 index 000000000..1a4585cab --- /dev/null +++ b/exporter/stderr/stderr.go @@ -0,0 +1,21 @@ +package stderr + +import ( + "os" + + "github.com/open-telemetry/opentelemetry-go/exporter/observer" + "github.com/open-telemetry/opentelemetry-go/exporter/reader" + "github.com/open-telemetry/opentelemetry-go/exporter/reader/format" +) + +type ( + stderrLog struct{} +) + +func New() observer.Observer { + return reader.NewReaderObserver(&stderrLog{}) +} + +func (s *stderrLog) Read(data reader.Event) { + os.Stderr.WriteString(format.EventToString(data)) +} diff --git a/exporter/stdout/install/package.go b/exporter/stdout/install/package.go new file mode 100644 index 000000000..382578710 --- /dev/null +++ b/exporter/stdout/install/package.go @@ -0,0 +1,16 @@ +package install + +import ( + "github.com/open-telemetry/opentelemetry-go/exporter/observer" + "github.com/open-telemetry/opentelemetry-go/exporter/stdout" +) + +// Use this import: +// +// import _ "github.com/open-telemetry/opentelemetry-go/exporter/stdout/install" +// +// to include the stderr exporter by default. + +func init() { + observer.RegisterObserver(stdout.New()) +} diff --git a/exporter/stdout/plugin/Makefile b/exporter/stdout/plugin/Makefile new file mode 100644 index 000000000..23cd0b604 --- /dev/null +++ b/exporter/stdout/plugin/Makefile @@ -0,0 +1,4 @@ +.PHONY: module + +module: + go build -buildmode=plugin -o stdout.so package.go diff --git a/exporter/stdout/plugin/package.go b/exporter/stdout/plugin/package.go new file mode 100644 index 000000000..f3f455cc1 --- /dev/null +++ b/exporter/stdout/plugin/package.go @@ -0,0 +1,10 @@ +package main + +import "github.com/open-telemetry/opentelemetry-go/exporter/stdout" + +var ( + Observer = stdout.New() +) + +func main() { +} diff --git a/exporter/stdout/stdout.go b/exporter/stdout/stdout.go new file mode 100644 index 000000000..2fc304683 --- /dev/null +++ b/exporter/stdout/stdout.go @@ -0,0 +1,21 @@ +package stdout + +import ( + "os" + + "github.com/open-telemetry/opentelemetry-go/exporter/observer" + "github.com/open-telemetry/opentelemetry-go/exporter/reader" + "github.com/open-telemetry/opentelemetry-go/exporter/reader/format" +) + +type ( + stdoutLog struct{} +) + +func New() observer.Observer { + return reader.NewReaderObserver(&stdoutLog{}) +} + +func (s *stdoutLog) Read(data reader.Event) { + os.Stdout.WriteString(format.EventToString(data)) +} diff --git a/plugin/httptrace/api.go b/plugin/httptrace/api.go new file mode 100644 index 000000000..6443c19e6 --- /dev/null +++ b/plugin/httptrace/api.go @@ -0,0 +1,35 @@ +package httptrace + +import ( + "context" + "net/http" + "net/http/httptrace" + + "github.com/open-telemetry/opentelemetry-go/api/trace" +) + +// Client +func W3C(ctx context.Context, req *http.Request) (context.Context, *http.Request, trace.Injector) { + t := newClientTracer(ctx) + + t.GetConn = t.getConn + t.GotConn = t.gotConn + t.PutIdleConn = t.putIdleConn + t.GotFirstResponseByte = t.gotFirstResponseByte + t.Got100Continue = t.got100Continue + t.Got1xxResponse = t.got1xxResponse + t.DNSStart = t.dnsStart + t.DNSDone = t.dnsDone + t.ConnectStart = t.connectStart + t.ConnectDone = t.connectDone + t.TLSHandshakeStart = t.tlsHandshakeStart + t.TLSHandshakeDone = t.tlsHandshakeDone + t.WroteHeaderField = t.wroteHeaderField + t.WroteHeaders = t.wroteHeaders + t.Wait100Continue = t.wait100Continue + t.WroteRequest = t.wroteRequest + + ctx = httptrace.WithClientTrace(ctx, &t.ClientTrace) + req = req.WithContext(ctx) + return ctx, req, hinjector{req} +} diff --git a/plugin/httptrace/clienttrace.go b/plugin/httptrace/clienttrace.go new file mode 100644 index 000000000..02c15add0 --- /dev/null +++ b/plugin/httptrace/clienttrace.go @@ -0,0 +1,174 @@ +package httptrace + +import ( + "context" + "crypto/tls" + "net/http/httptrace" + "net/textproto" + "strings" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/open-telemetry/opentelemetry-go/api/trace" +) + +type ( + clientLevel struct { + trace.Span + ident string + } + + clientTracer struct { + context.Context + httptrace.ClientTrace + + levels []clientLevel + } +) + +var ( + HTTPStatus = tag.New("http.status") + HTTPHeaderMIME = tag.New("http.mime") + HTTPRemoteAddr = tag.New("http.remote") + HTTPLocalAddr = tag.New("http.local") +) + +func newClientTracer(ctx context.Context) *clientTracer { + ct := &clientTracer{ + Context: ctx, + } + ct.open("http.request") + return ct +} + +func (ct *clientTracer) open(name string, attrs ...core.KeyValue) { + _, sp := trace.Start(ct.Context, name, trace.WithAttributes(attrs...)) + ct.levels = append(ct.levels, clientLevel{ + Span: sp, + ident: name, + }) +} + +func (ct *clientTracer) close(name string) { + if len(ct.levels) == 0 { + panic("remove me") + } + l := len(ct.levels) + ct.levels[l-1].Finish() + ct.levels = ct.levels[0 : l-1] +} + +func (ct *clientTracer) current() trace.Span { + return ct.levels[len(ct.levels)-1].Span +} + +func (ct *clientTracer) currentName() string { + if len(ct.levels) == 0 { + return "" + } + return ct.levels[len(ct.levels)-1].ident +} + +func (ct *clientTracer) getConn(host string) { + ct.open("http.getconn", HostKey.String(host)) +} + +func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) { + ct.current().SetAttribute(HTTPRemoteAddr.String(info.Conn.RemoteAddr().String())) + ct.current().SetAttribute(HTTPLocalAddr.String(info.Conn.LocalAddr().String())) + + ct.close("http.getconn") +} + +func (ct *clientTracer) putIdleConn(err error) { + if err != nil { + ct.current().SetAttribute(trace.MessageKey.String(err.Error())) + ct.current().SetError(true) + } + ct.close("http.receive") +} + +func (ct *clientTracer) gotFirstResponseByte() { + ct.open("http.receive") +} + +func (ct *clientTracer) dnsStart(httptrace.DNSStartInfo) { + ct.open("http.dns") +} + +func (ct *clientTracer) dnsDone(httptrace.DNSDoneInfo) { + ct.close("http.dns") +} + +func (ct *clientTracer) connectStart(network, addr string) { + ct.open("http.connect") +} + +func (ct *clientTracer) connectDone(network, addr string, err error) { + ct.close("http.connect") +} + +func (ct *clientTracer) tlsHandshakeStart() { + ct.open("http.tls") +} + +func (ct *clientTracer) tlsHandshakeDone(tls.ConnectionState, error) { + ct.close("http.tls") +} + +func (ct *clientTracer) wroteHeaderField(key string, value []string) { + if ct.currentName() != "http.headers" { + ct.open("http.headers") + } + ct.levels[0].SetAttribute(tag.New("http." + strings.ToLower(key)).String(sa2s(value))) +} + +func (ct *clientTracer) wroteHeaders() { + ct.open("http.send") +} + +func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) { + if info.Err != nil { + ct.levels[0].SetAttribute(trace.MessageKey.String(info.Err.Error())) + ct.levels[0].SetError(true) + } + ct.close("http.send") +} + +func (ct *clientTracer) got100Continue() { + ct.current().Log(ct.Context, "GOT 100 - Continue") +} + +func (ct *clientTracer) wait100Continue() { + ct.current().Log(ct.Context, "GOT 100 - Wait") +} + +func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) error { + ct.current().Log(ct.Context, "GOT 1xx", + HTTPStatus.Int(code), + HTTPHeaderMIME.String(sm2s(header)), + ) + return nil +} + +func sa2s(value []string) string { + if len(value) == 1 { + return value[0] + } else if len(value) == 0 { + return "undefined" + } + return strings.Join(value, ",") +} + +func sm2s(value map[string][]string) string { + var buf strings.Builder + for k, v := range value { + if buf.Len() != 0 { + buf.WriteString(",") + } + buf.WriteString(k) + buf.WriteString("=") + buf.WriteString(sa2s(v)) + } + return buf.String() +} diff --git a/plugin/httptrace/httptrace.go b/plugin/httptrace/httptrace.go new file mode 100644 index 000000000..464580836 --- /dev/null +++ b/plugin/httptrace/httptrace.go @@ -0,0 +1,87 @@ +package httptrace + +import ( + "encoding/binary" + "net/http" + + "github.com/open-telemetry/opentelemetry-go/api/core" + "github.com/open-telemetry/opentelemetry-go/api/tag" + "github.com/lightstep/tracecontext.go" + "github.com/lightstep/tracecontext.go/tracestate" +) + +const ( + Vendor = "ot" +) + +type ( + hinjector struct { + *http.Request + } +) + +var ( + HostKey = tag.New("http.host") + URLKey = tag.New("http.url") + + encoding = binary.BigEndian +) + +// Returns the Attributes, Context Tags, and SpanContext that were encoded by Inject. +func Extract(req *http.Request) ([]core.KeyValue, []core.KeyValue, core.SpanContext) { + tc, err := tracecontext.FromHeaders(req.Header) + + if err != nil { + return nil, nil, core.SpanContext{} + } + + var sc core.SpanContext + sc.SpanID = encoding.Uint64(tc.TraceParent.SpanID[0:8]) + sc.TraceIDHigh = encoding.Uint64(tc.TraceParent.TraceID[0:8]) + sc.TraceIDLow = encoding.Uint64(tc.TraceParent.TraceID[8:16]) + + attrs := []core.KeyValue{ + URLKey.String(req.URL.String()), + // Etc. + } + + var tags []core.KeyValue + + for _, ts := range tc.TraceState { + if ts.Vendor != Vendor { + continue + } + // TODO: max-hops, type conversion questions answered, + // case-conversion questions. + tags = append(tags, tag.New(ts.Tenant).String(ts.Value)) + } + + return attrs, tags, sc +} + +func (h hinjector) Inject(sc core.SpanContext, tags tag.Map) { + var tc tracecontext.TraceContext + var sid [8]byte + var tid [16]byte + + encoding.PutUint64(sid[0:8], sc.SpanID) + encoding.PutUint64(tid[0:8], sc.TraceIDHigh) + encoding.PutUint64(tid[8:16], sc.TraceIDLow) + + tc.TraceParent.Version = tracecontext.Version + tc.TraceParent.TraceID = tid + tc.TraceParent.SpanID = sid + tc.TraceParent.Flags.Recorded = true // Note: not implemented. + + tags.Foreach(func(kv core.KeyValue) bool { + // TODO: implement MaxHops + tc.TraceState = append(tc.TraceState, tracestate.Member{ + Vendor: Vendor, + Tenant: kv.Key.Name(), + Value: kv.Value.Emit(), + }) + return true + }) + + tc.SetHeaders(h.Header) +}