mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-03 13:11:53 +02:00
Merge branch 'master' into send-headers
This commit is contained in:
commit
9ce42c89c5
1
.gitignore
vendored
1
.gitignore
vendored
@ -15,6 +15,7 @@ coverage.*
|
||||
/example/http/server/server
|
||||
/example/jaeger/jaeger
|
||||
/example/namedtracer/namedtracer
|
||||
/example/otlp/otel-test
|
||||
/example/prometheus/prometheus
|
||||
/example/zipkin/zipkin
|
||||
/example/otel-collector/otel-collector
|
||||
|
@ -71,7 +71,7 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*benchFixture) Process(context.Context, export.Record) error {
|
||||
func (*benchFixture) Process(export.Record) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -75,7 +75,7 @@ func (p *traceProvider) setDelegate(provider trace.Provider) {
|
||||
}
|
||||
|
||||
// Tracer implements trace.Provider.
|
||||
func (p *traceProvider) Tracer(name string) trace.Tracer {
|
||||
func (p *traceProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
|
||||
@ -83,7 +83,7 @@ func (p *traceProvider) Tracer(name string) trace.Tracer {
|
||||
return p.delegate.Tracer(name)
|
||||
}
|
||||
|
||||
t := &tracer{name: name}
|
||||
t := &tracer{name: name, opts: opts}
|
||||
p.tracers = append(p.tracers, t)
|
||||
return t
|
||||
}
|
||||
@ -95,6 +95,7 @@ func (p *traceProvider) Tracer(name string) trace.Tracer {
|
||||
type tracer struct {
|
||||
once sync.Once
|
||||
name string
|
||||
opts []trace.TracerOption
|
||||
|
||||
delegate trace.Tracer
|
||||
}
|
||||
@ -110,7 +111,7 @@ var _ trace.Tracer = &tracer{}
|
||||
// Delegation only happens on the first call to this method. All subsequent
|
||||
// calls result in no delegation changes.
|
||||
func (t *tracer) setDelegate(provider trace.Provider) {
|
||||
t.once.Do(func() { t.delegate = provider.Tracer(t.name) })
|
||||
t.once.Do(func() { t.delegate = provider.Tracer(t.name, t.opts...) })
|
||||
}
|
||||
|
||||
// WithSpan implements trace.Tracer by forwarding the call to t.delegate if
|
||||
|
@ -25,7 +25,7 @@ type testTraceProvider struct{}
|
||||
|
||||
var _ trace.Provider = &testTraceProvider{}
|
||||
|
||||
func (*testTraceProvider) Tracer(_ string) trace.Tracer {
|
||||
func (*testTraceProvider) Tracer(_ string, _ ...trace.TracerOption) trace.Tracer {
|
||||
return &trace.NoopTracer{}
|
||||
}
|
||||
|
||||
|
@ -24,9 +24,31 @@ import (
|
||||
)
|
||||
|
||||
type Provider interface {
|
||||
// Tracer creates a named tracer that implements Tracer interface.
|
||||
// If the name is an empty string then provider uses default name.
|
||||
Tracer(name string) Tracer
|
||||
// Tracer creates an implementation of the Tracer interface.
|
||||
// The instrumentationName must be the name of the library providing
|
||||
// instrumentation. This name may be the same as the instrumented code
|
||||
// only if that code provides built-in instrumentation. If the
|
||||
// instrumentationName is empty, then a implementation defined default
|
||||
// name will be used instead.
|
||||
Tracer(instrumentationName string, opts ...TracerOption) Tracer
|
||||
}
|
||||
|
||||
// TODO (MrAlias): unify this API option design:
|
||||
// https://github.com/open-telemetry/opentelemetry-go/issues/536
|
||||
|
||||
// TracerConfig contains options for a Tracer.
|
||||
type TracerConfig struct {
|
||||
InstrumentationVersion string
|
||||
}
|
||||
|
||||
// TracerOption configures a TracerConfig option.
|
||||
type TracerOption func(*TracerConfig)
|
||||
|
||||
// WithInstrumentationVersion sets the instrumentation version for a Tracer.
|
||||
func WithInstrumentationVersion(version string) TracerOption {
|
||||
return func(c *TracerConfig) {
|
||||
c.InstrumentationVersion = version
|
||||
}
|
||||
}
|
||||
|
||||
type Tracer interface {
|
||||
|
@ -19,6 +19,6 @@ type NoopProvider struct{}
|
||||
var _ Provider = NoopProvider{}
|
||||
|
||||
// Tracer returns noop implementation of Tracer.
|
||||
func (p NoopProvider) Tracer(name string) Tracer {
|
||||
func (p NoopProvider) Tracer(_ string, _ ...TracerOption) Tracer {
|
||||
return NoopTracer{}
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ type WrapperProvider struct {
|
||||
var _ oteltrace.Provider = (*WrapperProvider)(nil)
|
||||
|
||||
// Tracer returns the WrapperTracer associated with the WrapperProvider.
|
||||
func (p *WrapperProvider) Tracer(name string) oteltrace.Tracer {
|
||||
func (p *WrapperProvider) Tracer(_ string, _ ...oteltrace.TracerOption) oteltrace.Tracer {
|
||||
return p.wTracer
|
||||
}
|
||||
|
||||
|
80
example/otlp/README.md
Normal file
80
example/otlp/README.md
Normal file
@ -0,0 +1,80 @@
|
||||
# OTLP Example
|
||||
This example demonstrates how to export trace and metric data from an
|
||||
application using OpenTelemetry's own wire protocol
|
||||
[OTLP](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/protocol/README.md).
|
||||
We will also walk you through configuring a collector to accept OTLP exports.
|
||||
|
||||
### How to run?
|
||||
|
||||
#### Prequisites
|
||||
- go >=1.13 installed
|
||||
- OpenTelemetry collector is available
|
||||
|
||||
#### Configure the Collector
|
||||
Follow the instructions [on the
|
||||
website](https://opentelemetry.io/docs/collector/about/) to install a working
|
||||
instance of the collector. This example assumes you have the collector installed
|
||||
locally.
|
||||
|
||||
To configure the collector to accept OTLP traffic from our application,
|
||||
ensure that it has the following configs:
|
||||
|
||||
```yaml
|
||||
receivers:
|
||||
otlp:
|
||||
endpoint: 0.0.0.0:55680 # listens to localhost:55680
|
||||
|
||||
# potentially other receivers
|
||||
|
||||
service:
|
||||
pipelines:
|
||||
|
||||
traces:
|
||||
receivers:
|
||||
- otlp
|
||||
# potentially other receivers
|
||||
processors: # whatever processors you need
|
||||
exporters: # wherever you want your data to go
|
||||
|
||||
metrics:
|
||||
receivers:
|
||||
-otlp
|
||||
# potentially other receivers
|
||||
processors: etc
|
||||
exporters: etc
|
||||
|
||||
# other services
|
||||
```
|
||||
|
||||
An example config has been provided at
|
||||
[example-otlp-config.yaml](otlp/example-otlp-config.yaml).
|
||||
|
||||
Then to run:
|
||||
```sh
|
||||
./[YOUR_COLLECTOR_BINARY] --config [PATH_TO_CONFIG]
|
||||
```
|
||||
|
||||
If you use the example config, it's set to export to `stdout`. If you run
|
||||
the collector on the same machine as the example application, you should
|
||||
see trace and metric outputs from the collector.
|
||||
|
||||
#### Start the Application
|
||||
An example application is included in this directory. It simulates the process
|
||||
of scribing a spell scroll (e.g. in [D&D](https://roll20.net/compendium/dnd5e/Spell%20Scroll#content)).
|
||||
The application has been instrumented and exports both trace and metric data
|
||||
via OTLP to any listening receiver. To run it:
|
||||
|
||||
```sh
|
||||
go get -d go.opentelemetry.io/otel
|
||||
cd $GOPATH/go.opentelemetry.io/otel/example/otlp
|
||||
go run main.go
|
||||
```
|
||||
|
||||
The application is currently configured to transmit exported data to
|
||||
`localhost:55680`. See [main.go](otlp/main.go) for full details.
|
||||
|
||||
After starting the application, you should see trace and metric log output
|
||||
on the collector.
|
||||
|
||||
Note, if the receiver is incorrectly configured to take in metric data, the
|
||||
application may complain about being unable to connect.
|
29
example/otlp/example-otlp-config.yaml
Normal file
29
example/otlp/example-otlp-config.yaml
Normal file
@ -0,0 +1,29 @@
|
||||
extensions:
|
||||
health_check:
|
||||
|
||||
receivers:
|
||||
otlp:
|
||||
endpoint: 0.0.0.0:55680
|
||||
|
||||
processors:
|
||||
batch:
|
||||
queued_retry:
|
||||
|
||||
exporters:
|
||||
logging:
|
||||
loglevel: debug
|
||||
|
||||
service:
|
||||
|
||||
pipelines:
|
||||
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
processors: [batch, queued_retry]
|
||||
exporters: [logging]
|
||||
|
||||
metrics:
|
||||
receivers: [otlp]
|
||||
exporters: [logging]
|
||||
|
||||
extensions: [health_check]
|
13
example/otlp/go.mod
Normal file
13
example/otlp/go.mod
Normal file
@ -0,0 +1,13 @@
|
||||
module go.opentelemetry.io/otel/example/otel-test
|
||||
|
||||
go 1.13
|
||||
|
||||
replace (
|
||||
go.opentelemetry.io/otel => ../..
|
||||
go.opentelemetry.io/otel/exporters/otlp => ../../exporters/otlp
|
||||
)
|
||||
|
||||
require (
|
||||
go.opentelemetry.io/otel v0.6.0
|
||||
go.opentelemetry.io/otel/exporters/otlp v0.6.0
|
||||
)
|
109
example/otlp/go.sum
Normal file
109
example/otlp/go.sum
Normal file
@ -0,0 +1,109 @@
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs=
|
||||
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60=
|
||||
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
|
||||
github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w=
|
||||
github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
|
||||
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
|
||||
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.14.3 h1:OCJlWkOUoTnl0neNGlf4fUm3TmbEtguw7vR+nGtnDjY=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0=
|
||||
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
|
||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/open-telemetry/opentelemetry-proto v0.3.0 h1:+ASAtcayvoELyCF40+rdCMlBOhZIn5TPDez85zSYc30=
|
||||
github.com/open-telemetry/opentelemetry-proto v0.3.0/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8=
|
||||
github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0 h1:2mqDk8w/o6UmeUCu5Qiq2y7iMf6anbx+YA8d1JFoFrs=
|
||||
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
|
||||
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03 h1:4HYDjxeNXAOTv3o1N2tjo8UUSlhQgAD52FVkwxnWgM8=
|
||||
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
|
||||
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
|
||||
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
|
||||
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
|
||||
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4=
|
||||
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
|
||||
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
116
example/otlp/main.go
Normal file
116
example/otlp/main.go
Normal file
@ -0,0 +1,116 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Example application showcasing opentelemetry Go using the OTLP wire
|
||||
// protocol
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/api/global"
|
||||
"go.opentelemetry.io/otel/api/kv"
|
||||
"go.opentelemetry.io/otel/api/metric"
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
"go.opentelemetry.io/otel/sdk/metric/controller/push"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
)
|
||||
|
||||
// Initializes an OTLP exporter, and configures the corresponding trace and
|
||||
// metric providers.
|
||||
func initProvider() (*otlp.Exporter, *push.Controller) {
|
||||
exp, err := otlp.NewExporter(
|
||||
otlp.WithInsecure(),
|
||||
otlp.WithAddress("localhost:55680"),
|
||||
)
|
||||
handleErr(err, "Failed to create exporter: $v")
|
||||
|
||||
traceProvider, err := sdktrace.NewProvider(
|
||||
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
|
||||
sdktrace.WithSyncer(exp),
|
||||
)
|
||||
handleErr(err, "Failed to create trace provider: %v")
|
||||
|
||||
pusher := push.New(
|
||||
simple.NewWithExactDistribution(),
|
||||
exp,
|
||||
push.WithStateful(true),
|
||||
push.WithPeriod(2*time.Second),
|
||||
)
|
||||
|
||||
global.SetTraceProvider(traceProvider)
|
||||
global.SetMeterProvider(pusher.Provider())
|
||||
pusher.Start()
|
||||
|
||||
return exp, pusher
|
||||
}
|
||||
|
||||
func main() {
|
||||
exp, pusher := initProvider()
|
||||
defer func() { handleErr(exp.Stop(), "Failed to stop exporter") }()
|
||||
defer pusher.Stop() // pushes any last exports to the receiver
|
||||
|
||||
tracer := global.Tracer("mage-sense")
|
||||
meter := global.Meter("mage-read")
|
||||
|
||||
// labels represent additional descriptors that can be bound to a metric
|
||||
// observer or recorder. In this case they describe the location in
|
||||
// which a spell scroll is scribed.
|
||||
commonLabels := []kv.KeyValue{
|
||||
kv.String("work-room", "East Scriptorium"),
|
||||
kv.String("occupancy", "69,105"),
|
||||
kv.String("priority", "Ultra"),
|
||||
}
|
||||
|
||||
// Observer metric example
|
||||
oneMetricCB := func(_ context.Context, result metric.Float64ObserverResult) {
|
||||
result.Observe(1, commonLabels...)
|
||||
}
|
||||
_ = metric.Must(meter).NewFloat64ValueObserver("scrying.glass.one", oneMetricCB,
|
||||
metric.WithDescription("A ValueObserver set to 1.0"),
|
||||
)
|
||||
|
||||
// Recorder metric example
|
||||
valuerecorder := metric.Must(meter).
|
||||
NewFloat64ValueRecorder("scrying.glass.two").
|
||||
Bind(commonLabels...)
|
||||
defer valuerecorder.Unbind()
|
||||
|
||||
// work begins
|
||||
ctx, span := tracer.Start(context.Background(), "Archmage-Overlord-Inspection")
|
||||
for i := 0; i < 10; i++ {
|
||||
_, innerSpan := tracer.Start(ctx, fmt.Sprintf("Minion-%d", i))
|
||||
log.Println("Minions hard at work, scribing...")
|
||||
valuerecorder.Record(ctx, float64(i)*1.5)
|
||||
|
||||
<-time.After(time.Second)
|
||||
innerSpan.End()
|
||||
}
|
||||
|
||||
span.End()
|
||||
<-time.After(time.Second)
|
||||
|
||||
log.Println("Spell-scroll scribed!")
|
||||
}
|
||||
|
||||
func handleErr(err error, message string) {
|
||||
if err != nil {
|
||||
log.Fatalf("%s: %v", message, err)
|
||||
}
|
||||
}
|
@ -101,7 +101,7 @@ func TestStdoutTimestamp(t *testing.T) {
|
||||
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
lvagg := lastvalue.New()
|
||||
aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
|
||||
lvagg.Checkpoint(ctx, &desc)
|
||||
lvagg.Checkpoint(&desc)
|
||||
|
||||
checkpointSet.Add(&desc, lvagg)
|
||||
|
||||
@ -146,7 +146,7 @@ func TestStdoutCounterFormat(t *testing.T) {
|
||||
desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
|
||||
cagg := sum.New()
|
||||
aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
|
||||
cagg.Checkpoint(fix.ctx, &desc)
|
||||
cagg.Checkpoint(&desc)
|
||||
|
||||
checkpointSet.Add(&desc, cagg, kv.String("A", "B"), kv.String("C", "D"))
|
||||
|
||||
@ -163,7 +163,7 @@ func TestStdoutLastValueFormat(t *testing.T) {
|
||||
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
|
||||
lvagg := lastvalue.New()
|
||||
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
|
||||
lvagg.Checkpoint(fix.ctx, &desc)
|
||||
lvagg.Checkpoint(&desc)
|
||||
|
||||
checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
|
||||
|
||||
@ -181,7 +181,7 @@ func TestStdoutMinMaxSumCount(t *testing.T) {
|
||||
magg := minmaxsumcount.New(&desc)
|
||||
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
|
||||
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
|
||||
magg.Checkpoint(fix.ctx, &desc)
|
||||
magg.Checkpoint(&desc)
|
||||
|
||||
checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))
|
||||
|
||||
@ -204,7 +204,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
|
||||
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(float64(i)+0.5), &desc)
|
||||
}
|
||||
|
||||
magg.Checkpoint(fix.ctx, &desc)
|
||||
magg.Checkpoint(&desc)
|
||||
|
||||
checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))
|
||||
|
||||
@ -252,7 +252,7 @@ func TestStdoutNoData(t *testing.T) {
|
||||
checkpointSet := test.NewCheckpointSet(testResource)
|
||||
|
||||
magg := tc
|
||||
magg.Checkpoint(fix.ctx, &desc)
|
||||
magg.Checkpoint(&desc)
|
||||
|
||||
checkpointSet.Add(&desc, magg)
|
||||
|
||||
@ -270,7 +270,7 @@ func TestStdoutLastValueNotSet(t *testing.T) {
|
||||
|
||||
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
|
||||
lvagg := lastvalue.New()
|
||||
lvagg.Checkpoint(fix.ctx, &desc)
|
||||
lvagg.Checkpoint(&desc)
|
||||
|
||||
checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
|
||||
|
||||
@ -321,7 +321,7 @@ func TestStdoutResource(t *testing.T) {
|
||||
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
|
||||
lvagg := lastvalue.New()
|
||||
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
|
||||
lvagg.Checkpoint(fix.ctx, &desc)
|
||||
lvagg.Checkpoint(&desc)
|
||||
|
||||
checkpointSet.Add(&desc, lvagg, tc.attrs...)
|
||||
|
||||
|
@ -105,7 +105,7 @@ func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.
|
||||
ctx := context.Background()
|
||||
// Updates and checkpoint the new aggregator
|
||||
_ = newAgg.Update(ctx, createNumber(desc, v), desc)
|
||||
newAgg.Checkpoint(ctx, desc)
|
||||
newAgg.Checkpoint(desc)
|
||||
|
||||
// Try to add this aggregator to the CheckpointSet
|
||||
agg, added := p.Add(desc, newAgg, labels...)
|
||||
|
@ -89,7 +89,7 @@ func TestMinMaxSumCountValue(t *testing.T) {
|
||||
assert.EqualError(t, err, aggregator.ErrNoData.Error())
|
||||
|
||||
// Checkpoint to set non-zero values
|
||||
mmsc.Checkpoint(context.Background(), &metric.Descriptor{})
|
||||
mmsc.Checkpoint(&metric.Descriptor{})
|
||||
min, max, sum, count, err := minMaxSumCountValues(mmsc)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, min, metric.NewInt64Number(1))
|
||||
@ -146,7 +146,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
|
||||
if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) {
|
||||
return
|
||||
}
|
||||
mmsc.Checkpoint(ctx, &metric.Descriptor{})
|
||||
mmsc.Checkpoint(&metric.Descriptor{})
|
||||
for _, test := range tests {
|
||||
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
|
||||
metric.WithDescription(test.description),
|
||||
@ -165,7 +165,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
|
||||
mmsc := minmaxsumcount.New(&desc)
|
||||
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
|
||||
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
|
||||
mmsc.Checkpoint(context.Background(), &desc)
|
||||
mmsc.Checkpoint(&desc)
|
||||
expected := []*metricpb.SummaryDataPoint{
|
||||
{
|
||||
Count: 2,
|
||||
@ -261,7 +261,7 @@ func TestSumInt64DataPoints(t *testing.T) {
|
||||
labels := label.NewSet()
|
||||
s := sumAgg.New()
|
||||
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
|
||||
s.Checkpoint(context.Background(), &desc)
|
||||
s.Checkpoint(&desc)
|
||||
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
|
||||
assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64DataPoints)
|
||||
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
|
||||
@ -275,7 +275,7 @@ func TestSumFloat64DataPoints(t *testing.T) {
|
||||
labels := label.NewSet()
|
||||
s := sumAgg.New()
|
||||
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
|
||||
s.Checkpoint(context.Background(), &desc)
|
||||
s.Checkpoint(&desc)
|
||||
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
|
||||
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
|
||||
assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDataPoints)
|
||||
|
@ -657,7 +657,7 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
|
||||
default:
|
||||
t.Fatalf("invalid number kind: %v", r.nKind)
|
||||
}
|
||||
agg.Checkpoint(ctx, &desc)
|
||||
agg.Checkpoint(&desc)
|
||||
|
||||
equiv := r.resource.Equivalent()
|
||||
resources[equiv] = r.resource
|
||||
|
@ -62,11 +62,12 @@ type Integrator interface {
|
||||
|
||||
// Process is called by the SDK once per internal record,
|
||||
// passing the export Record (a Descriptor, the corresponding
|
||||
// Labels, and the checkpointed Aggregator).
|
||||
//
|
||||
// The Context argument originates from the controller that
|
||||
// orchestrates collection.
|
||||
Process(ctx context.Context, record Record) error
|
||||
// Labels, and the checkpointed Aggregator). This call has no
|
||||
// Context argument because it is expected to perform only
|
||||
// computation. An SDK is not expected to call exporters from
|
||||
// with Process, use a controller for that (see
|
||||
// ./controllers/{pull,push}.
|
||||
Process(record Record) error
|
||||
}
|
||||
|
||||
// AggregationSelector supports selecting the kind of Aggregator to
|
||||
@ -119,9 +120,9 @@ type Aggregator interface {
|
||||
// accessed using by converting to one a suitable interface
|
||||
// types in the `aggregator` sub-package.
|
||||
//
|
||||
// The Context argument originates from the controller that
|
||||
// orchestrates collection.
|
||||
Checkpoint(context.Context, *metric.Descriptor)
|
||||
// This call has no Context argument because it is expected to
|
||||
// perform only computation.
|
||||
Checkpoint(*metric.Descriptor)
|
||||
|
||||
// Merge combines the checkpointed state from the argument
|
||||
// aggregator into this aggregator's checkpointed state.
|
||||
|
31
sdk/instrumentation/library.go
Normal file
31
sdk/instrumentation/library.go
Normal file
@ -0,0 +1,31 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/*
|
||||
Package instrumentation provides an instrumentation library structure to be
|
||||
passed to both the OpenTelemetry Tracer and Meter components.
|
||||
|
||||
For more information see
|
||||
[this](https://github.com/open-telemetry/oteps/blob/master/text/0083-component.md).
|
||||
*/
|
||||
package instrumentation
|
||||
|
||||
// Library represents the instrumentation library.
|
||||
type Library struct {
|
||||
// Name is the name of the instrumentation library. This should be the
|
||||
// Go package name of that library.
|
||||
Name string
|
||||
// Version is the version of the instrumentation library.
|
||||
Version string
|
||||
}
|
@ -85,7 +85,7 @@ func (c *Aggregator) Points() ([]metric.Number, error) {
|
||||
|
||||
// Checkpoint saves the current state and resets the current state to
|
||||
// the empty set, taking a lock to prevent concurrent Update() calls.
|
||||
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
|
||||
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
|
||||
c.lock.Lock()
|
||||
c.checkpoint, c.current = c.current, nil
|
||||
c.lock.Unlock()
|
||||
|
@ -15,7 +15,6 @@
|
||||
package array
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
@ -66,8 +65,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
||||
test.CheckedUpdate(t, agg, y, descriptor)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
@ -116,8 +114,6 @@ type mergeTest struct {
|
||||
}
|
||||
|
||||
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg1 := New()
|
||||
@ -145,8 +141,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
||||
}
|
||||
}
|
||||
|
||||
agg1.Checkpoint(ctx, descriptor)
|
||||
agg2.Checkpoint(ctx, descriptor)
|
||||
agg1.Checkpoint(descriptor)
|
||||
agg2.Checkpoint(descriptor)
|
||||
|
||||
test.CheckedMerge(t, agg1, agg2, descriptor)
|
||||
|
||||
@ -213,8 +209,6 @@ func TestArrayErrors(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrNoData)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
test.CheckedUpdate(t, agg, metric.Number(0), descriptor)
|
||||
@ -222,7 +216,7 @@ func TestArrayErrors(t *testing.T) {
|
||||
if profile.NumberKind == metric.Float64NumberKind {
|
||||
test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor)
|
||||
}
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
count, err := agg.Count()
|
||||
require.Equal(t, int64(1), count, "NaN value was not counted")
|
||||
@ -275,7 +269,6 @@ func TestArrayFloat64(t *testing.T) {
|
||||
|
||||
all := test.NewNumbers(metric.Float64NumberKind)
|
||||
|
||||
ctx := context.Background()
|
||||
agg := New()
|
||||
|
||||
for _, f := range fpsf(1) {
|
||||
@ -288,7 +281,7 @@ func TestArrayFloat64(t *testing.T) {
|
||||
test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor)
|
||||
}
|
||||
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
|
@ -103,7 +103,7 @@ func (c *Aggregator) toNumber(f float64) metric.Number {
|
||||
|
||||
// Checkpoint saves the current state and resets the current state to
|
||||
// the empty set, taking a lock to prevent concurrent Update() calls.
|
||||
func (c *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) {
|
||||
func (c *Aggregator) Checkpoint(*metric.Descriptor) {
|
||||
replace := sdk.NewDDSketch(c.cfg)
|
||||
|
||||
c.lock.Lock()
|
||||
|
@ -15,7 +15,6 @@
|
||||
package ddsketch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
@ -31,8 +30,6 @@ type updateTest struct {
|
||||
}
|
||||
|
||||
func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
agg := New(descriptor, NewDefaultConfig())
|
||||
|
||||
@ -47,7 +44,7 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
||||
test.CheckedUpdate(t, agg, y, descriptor)
|
||||
}
|
||||
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
@ -91,7 +88,6 @@ type mergeTest struct {
|
||||
}
|
||||
|
||||
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg1 := New(descriptor, NewDefaultConfig())
|
||||
@ -122,8 +118,8 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
||||
}
|
||||
}
|
||||
|
||||
agg1.Checkpoint(ctx, descriptor)
|
||||
agg2.Checkpoint(ctx, descriptor)
|
||||
agg1.Checkpoint(descriptor)
|
||||
agg2.Checkpoint(descriptor)
|
||||
|
||||
test.CheckedMerge(t, agg1, agg2, descriptor)
|
||||
|
||||
|
@ -107,7 +107,7 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
|
||||
// the empty set. Since no locks are taken, there is a chance that
|
||||
// the independent Sum, Count and Bucket Count are not consistent with each
|
||||
// other.
|
||||
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
|
||||
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
|
||||
c.lock.Lock()
|
||||
c.checkpoint, c.current = c.current, emptyState(c.boundaries)
|
||||
c.lock.Unlock()
|
||||
|
@ -15,7 +15,6 @@
|
||||
package histogram_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
@ -81,7 +80,6 @@ func TestHistogramPositiveAndNegative(t *testing.T) {
|
||||
|
||||
// Validates count, sum and buckets for a given profile and policy
|
||||
func testHistogram(t *testing.T, profile test.Profile, policy policy) {
|
||||
ctx := context.Background()
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := histogram.New(descriptor, boundaries)
|
||||
@ -94,7 +92,7 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) {
|
||||
test.CheckedUpdate(t, agg, x, descriptor)
|
||||
}
|
||||
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
@ -137,8 +135,6 @@ func TestHistogramInitial(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHistogramMerge(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
@ -158,8 +154,8 @@ func TestHistogramMerge(t *testing.T) {
|
||||
test.CheckedUpdate(t, agg2, x, descriptor)
|
||||
}
|
||||
|
||||
agg1.Checkpoint(ctx, descriptor)
|
||||
agg2.Checkpoint(ctx, descriptor)
|
||||
agg1.Checkpoint(descriptor)
|
||||
agg2.Checkpoint(descriptor)
|
||||
|
||||
test.CheckedMerge(t, agg1, agg2, descriptor)
|
||||
|
||||
@ -192,13 +188,11 @@ func TestHistogramMerge(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHistogramNotSet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := histogram.New(descriptor, boundaries)
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
asum, err := agg.Sum()
|
||||
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
|
||||
|
@ -80,7 +80,7 @@ func (g *Aggregator) LastValue() (metric.Number, time.Time, error) {
|
||||
}
|
||||
|
||||
// Checkpoint atomically saves the current value.
|
||||
func (g *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) {
|
||||
func (g *Aggregator) Checkpoint(*metric.Descriptor) {
|
||||
g.checkpoint = atomic.LoadPointer(&g.current)
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,6 @@
|
||||
package lastvalue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
@ -50,8 +49,6 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
|
||||
func TestLastValueUpdate(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
agg := New()
|
||||
|
||||
@ -64,7 +61,7 @@ func TestLastValueUpdate(t *testing.T) {
|
||||
test.CheckedUpdate(t, agg, x, record)
|
||||
}
|
||||
|
||||
agg.Checkpoint(ctx, record)
|
||||
agg.Checkpoint(record)
|
||||
|
||||
lv, _, err := agg.LastValue()
|
||||
require.Equal(t, last, lv, "Same last value - non-monotonic")
|
||||
@ -73,8 +70,6 @@ func TestLastValueUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLastValueMerge(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
agg1 := New()
|
||||
agg2 := New()
|
||||
@ -88,8 +83,8 @@ func TestLastValueMerge(t *testing.T) {
|
||||
test.CheckedUpdate(t, agg1, first1, descriptor)
|
||||
test.CheckedUpdate(t, agg2, first2, descriptor)
|
||||
|
||||
agg1.Checkpoint(ctx, descriptor)
|
||||
agg2.Checkpoint(ctx, descriptor)
|
||||
agg1.Checkpoint(descriptor)
|
||||
agg2.Checkpoint(descriptor)
|
||||
|
||||
_, t1, err := agg1.LastValue()
|
||||
require.Nil(t, err)
|
||||
@ -110,7 +105,7 @@ func TestLastValueNotSet(t *testing.T) {
|
||||
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind)
|
||||
|
||||
g := New()
|
||||
g.Checkpoint(context.Background(), descriptor)
|
||||
g.Checkpoint(descriptor)
|
||||
|
||||
value, timestamp, err := g.LastValue()
|
||||
require.Equal(t, aggregator.ErrNoData, err)
|
||||
|
@ -102,7 +102,7 @@ func (c *Aggregator) Max() (metric.Number, error) {
|
||||
|
||||
// Checkpoint saves the current state and resets the current state to
|
||||
// the empty set.
|
||||
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
|
||||
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
|
||||
c.lock.Lock()
|
||||
c.checkpoint, c.current = c.current, c.emptyState()
|
||||
c.lock.Unlock()
|
||||
|
@ -15,7 +15,6 @@
|
||||
package minmaxsumcount
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
@ -78,7 +77,6 @@ func TestMinMaxSumCountPositiveAndNegative(t *testing.T) {
|
||||
|
||||
// Validates min, max, sum and count for a given profile and policy
|
||||
func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
|
||||
ctx := context.Background()
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := New(descriptor)
|
||||
@ -91,7 +89,7 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
|
||||
test.CheckedUpdate(t, agg, x, descriptor)
|
||||
}
|
||||
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
@ -124,8 +122,6 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
|
||||
}
|
||||
|
||||
func TestMinMaxSumCountMerge(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
@ -145,8 +141,8 @@ func TestMinMaxSumCountMerge(t *testing.T) {
|
||||
test.CheckedUpdate(t, agg2, x, descriptor)
|
||||
}
|
||||
|
||||
agg1.Checkpoint(ctx, descriptor)
|
||||
agg2.Checkpoint(ctx, descriptor)
|
||||
agg1.Checkpoint(descriptor)
|
||||
agg2.Checkpoint(descriptor)
|
||||
|
||||
test.CheckedMerge(t, agg1, agg2, descriptor)
|
||||
|
||||
@ -182,13 +178,11 @@ func TestMinMaxSumCountMerge(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMaxSumCountNotSet(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
|
||||
|
||||
agg := New(descriptor)
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
asum, err := agg.Sum()
|
||||
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
|
||||
|
@ -51,7 +51,7 @@ func (c *Aggregator) Sum() (metric.Number, error) {
|
||||
|
||||
// Checkpoint atomically saves the current value and resets the
|
||||
// current sum to zero.
|
||||
func (c *Aggregator) Checkpoint(ctx context.Context, _ *metric.Descriptor) {
|
||||
func (c *Aggregator) Checkpoint(*metric.Descriptor) {
|
||||
c.checkpoint = c.current.SwapNumberAtomic(metric.Number(0))
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,6 @@
|
||||
package sum
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"unsafe"
|
||||
@ -49,8 +48,6 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
|
||||
func TestCounterSum(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
agg := New()
|
||||
|
||||
@ -63,7 +60,7 @@ func TestCounterSum(t *testing.T) {
|
||||
test.CheckedUpdate(t, agg, x, descriptor)
|
||||
}
|
||||
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
asum, err := agg.Sum()
|
||||
require.Equal(t, sum, asum, "Same sum - monotonic")
|
||||
@ -72,8 +69,6 @@ func TestCounterSum(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestValueRecorderSum(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
agg := New()
|
||||
|
||||
@ -90,7 +85,7 @@ func TestValueRecorderSum(t *testing.T) {
|
||||
sum.AddNumber(profile.NumberKind, r2)
|
||||
}
|
||||
|
||||
agg.Checkpoint(ctx, descriptor)
|
||||
agg.Checkpoint(descriptor)
|
||||
|
||||
asum, err := agg.Sum()
|
||||
require.Equal(t, sum, asum, "Same sum - monotonic")
|
||||
@ -99,8 +94,6 @@ func TestValueRecorderSum(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCounterMerge(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
agg1 := New()
|
||||
agg2 := New()
|
||||
@ -115,8 +108,8 @@ func TestCounterMerge(t *testing.T) {
|
||||
test.CheckedUpdate(t, agg2, x, descriptor)
|
||||
}
|
||||
|
||||
agg1.Checkpoint(ctx, descriptor)
|
||||
agg2.Checkpoint(ctx, descriptor)
|
||||
agg1.Checkpoint(descriptor)
|
||||
agg2.Checkpoint(descriptor)
|
||||
|
||||
test.CheckedMerge(t, agg1, agg2, descriptor)
|
||||
|
||||
|
@ -32,13 +32,10 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
)
|
||||
|
||||
type processFunc func(context.Context, export.Record) error
|
||||
|
||||
type benchFixture struct {
|
||||
meter metric.MeterMust
|
||||
accumulator *sdk.Accumulator
|
||||
B *testing.B
|
||||
pcb processFunc
|
||||
}
|
||||
|
||||
func newFixture(b *testing.B) *benchFixture {
|
||||
@ -52,10 +49,6 @@ func newFixture(b *testing.B) *benchFixture {
|
||||
return bf
|
||||
}
|
||||
|
||||
func (f *benchFixture) setProcessCallback(cb processFunc) {
|
||||
f.pcb = cb
|
||||
}
|
||||
|
||||
func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
|
||||
name := descriptor.Name()
|
||||
switch {
|
||||
@ -75,11 +68,8 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *benchFixture) Process(ctx context.Context, rec export.Record) error {
|
||||
if f.pcb == nil {
|
||||
return nil
|
||||
}
|
||||
return f.pcb(ctx, rec)
|
||||
func (f *benchFixture) Process(rec export.Record) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*benchFixture) CheckpointSet() export.CheckpointSet {
|
||||
@ -201,28 +191,14 @@ func BenchmarkAcquireReleaseExistingHandle(b *testing.B) {
|
||||
var benchmarkIteratorVar kv.KeyValue
|
||||
|
||||
func benchmarkIterator(b *testing.B, n int) {
|
||||
fix := newFixture(b)
|
||||
fix.setProcessCallback(func(ctx context.Context, rec export.Record) error {
|
||||
var kv kv.KeyValue
|
||||
li := rec.Labels().Iter()
|
||||
fix.B.StartTimer()
|
||||
for i := 0; i < fix.B.N; i++ {
|
||||
iter := li
|
||||
// test getting only the first element
|
||||
if iter.Next() {
|
||||
kv = iter.Label()
|
||||
}
|
||||
}
|
||||
fix.B.StopTimer()
|
||||
benchmarkIteratorVar = kv
|
||||
return nil
|
||||
})
|
||||
cnt := fix.meter.NewInt64Counter("int64.counter")
|
||||
ctx := context.Background()
|
||||
cnt.Add(ctx, 1, makeLabels(n)...)
|
||||
|
||||
labels := label.NewSet(makeLabels(n)...)
|
||||
b.ResetTimer()
|
||||
fix.accumulator.Collect(ctx)
|
||||
for i := 0; i < b.N; i++ {
|
||||
iter := labels.Iter()
|
||||
for iter.Next() {
|
||||
benchmarkIteratorVar = iter.Label()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkIterator_0(b *testing.B) {
|
||||
@ -560,11 +536,6 @@ func BenchmarkBatchRecord_8Labels_8Instruments(b *testing.B) {
|
||||
func BenchmarkRepeatedDirectCalls(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
encoder := label.DefaultEncoder()
|
||||
fix.pcb = func(_ context.Context, rec export.Record) error {
|
||||
_ = rec.Labels().Encoded(encoder)
|
||||
return nil
|
||||
}
|
||||
|
||||
c := fix.meter.NewInt64Counter("int64.counter")
|
||||
k := kv.String("bench", "true")
|
||||
@ -576,39 +547,3 @@ func BenchmarkRepeatedDirectCalls(b *testing.B) {
|
||||
fix.accumulator.Collect(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// LabelIterator
|
||||
|
||||
func BenchmarkLabelIterator(b *testing.B) {
|
||||
const labelCount = 1024
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
|
||||
var rec export.Record
|
||||
fix.pcb = func(_ context.Context, processRec export.Record) error {
|
||||
rec = processRec
|
||||
return nil
|
||||
}
|
||||
|
||||
keyValues := makeLabels(labelCount)
|
||||
counter := fix.meter.NewInt64Counter("test.counter")
|
||||
counter.Add(ctx, 1, keyValues...)
|
||||
|
||||
fix.accumulator.Collect(ctx)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
labels := rec.Labels()
|
||||
iter := labels.Iter()
|
||||
var val kv.KeyValue
|
||||
for i := 0; i < b.N; i++ {
|
||||
if !iter.Next() {
|
||||
iter = labels.Iter()
|
||||
iter.Next()
|
||||
}
|
||||
val = iter.Label()
|
||||
}
|
||||
if false {
|
||||
fmt.Println(val)
|
||||
}
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ func (ci *correctnessIntegrator) CheckpointSet() export.CheckpointSet {
|
||||
func (*correctnessIntegrator) FinishedCollection() {
|
||||
}
|
||||
|
||||
func (ci *correctnessIntegrator) Process(_ context.Context, record export.Record) error {
|
||||
func (ci *correctnessIntegrator) Process(record export.Record) error {
|
||||
ci.records = append(ci.records, record)
|
||||
return nil
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ func TestStressInt64Histogram(t *testing.T) {
|
||||
|
||||
startTime := time.Now()
|
||||
for time.Since(startTime) < time.Second {
|
||||
h.Checkpoint(context.Background(), &desc)
|
||||
h.Checkpoint(&desc)
|
||||
|
||||
b, _ := h.Histogram()
|
||||
c, _ := h.Count()
|
||||
|
@ -15,7 +15,6 @@
|
||||
package simple // import "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
@ -65,7 +64,7 @@ func New(selector export.AggregationSelector, stateful bool) *Integrator {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Integrator) Process(_ context.Context, record export.Record) error {
|
||||
func (b *Integrator) Process(record export.Record) error {
|
||||
desc := record.Descriptor()
|
||||
key := batchKey{
|
||||
descriptor: desc,
|
||||
|
@ -30,34 +30,33 @@ import (
|
||||
// These tests use the ../test label encoding.
|
||||
|
||||
func TestSimpleStateless(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
b := simple.New(test.NewAggregationSelector(), false)
|
||||
|
||||
// Set initial lastValue values
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels2, 20))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels3, 30))
|
||||
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
|
||||
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels2, 20))
|
||||
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels3, 30))
|
||||
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 10))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels2, 20))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels3, 30))
|
||||
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 10))
|
||||
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels2, 20))
|
||||
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels3, 30))
|
||||
|
||||
// Another lastValue Set for Labels1
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 50))
|
||||
_ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 50))
|
||||
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 50))
|
||||
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 50))
|
||||
|
||||
// Set initial counter values
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels2, 20))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels3, 40))
|
||||
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10))
|
||||
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels2, 20))
|
||||
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels3, 40))
|
||||
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels2, 20))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels3, 40))
|
||||
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10))
|
||||
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels2, 20))
|
||||
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels3, 40))
|
||||
|
||||
// Another counter Add for Labels1
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterADesc, test.Labels1, 50))
|
||||
_ = b.Process(ctx, test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50))
|
||||
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels1, 50))
|
||||
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50))
|
||||
|
||||
checkpointSet := b.CheckpointSet()
|
||||
|
||||
@ -97,11 +96,11 @@ func TestSimpleStateful(t *testing.T) {
|
||||
|
||||
counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)
|
||||
caggA := counterA.Aggregator()
|
||||
_ = b.Process(ctx, counterA)
|
||||
_ = b.Process(counterA)
|
||||
|
||||
counterB := test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10)
|
||||
caggB := counterB.Aggregator()
|
||||
_ = b.Process(ctx, counterB)
|
||||
_ = b.Process(counterB)
|
||||
|
||||
checkpointSet := b.CheckpointSet()
|
||||
b.FinishedCollection()
|
||||
@ -126,8 +125,8 @@ func TestSimpleStateful(t *testing.T) {
|
||||
// Update and re-checkpoint the original record.
|
||||
_ = caggA.Update(ctx, metric.NewInt64Number(20), &test.CounterADesc)
|
||||
_ = caggB.Update(ctx, metric.NewInt64Number(20), &test.CounterBDesc)
|
||||
caggA.Checkpoint(ctx, &test.CounterADesc)
|
||||
caggB.Checkpoint(ctx, &test.CounterBDesc)
|
||||
caggA.Checkpoint(&test.CounterADesc)
|
||||
caggB.Checkpoint(&test.CounterBDesc)
|
||||
|
||||
// As yet cagg has not been passed to Integrator.Process. Should
|
||||
// not see an update.
|
||||
@ -140,8 +139,8 @@ func TestSimpleStateful(t *testing.T) {
|
||||
b.FinishedCollection()
|
||||
|
||||
// Now process the second update
|
||||
_ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA))
|
||||
_ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB))
|
||||
_ = b.Process(export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA))
|
||||
_ = b.Process(export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB))
|
||||
|
||||
checkpointSet = b.CheckpointSet()
|
||||
|
||||
|
@ -131,7 +131,7 @@ func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator {
|
||||
ctx := context.Background()
|
||||
gagg := lastvalue.New()
|
||||
_ = gagg.Update(ctx, metric.NewInt64Number(v), desc)
|
||||
gagg.Checkpoint(ctx, desc)
|
||||
gagg.Checkpoint(desc)
|
||||
return gagg
|
||||
}
|
||||
|
||||
@ -150,7 +150,7 @@ func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator {
|
||||
ctx := context.Background()
|
||||
cagg := sum.New()
|
||||
_ = cagg.Update(ctx, metric.NewInt64Number(v), desc)
|
||||
cagg.Checkpoint(ctx, desc)
|
||||
cagg.Checkpoint(desc)
|
||||
return cagg
|
||||
}
|
||||
|
||||
|
@ -46,7 +46,7 @@ func TestStressInt64MinMaxSumCount(t *testing.T) {
|
||||
|
||||
startTime := time.Now()
|
||||
for time.Since(startTime) < time.Second {
|
||||
mmsc.Checkpoint(context.Background(), &desc)
|
||||
mmsc.Checkpoint(&desc)
|
||||
|
||||
s, _ := mmsc.Sum()
|
||||
c, _ := mmsc.Count()
|
||||
|
@ -50,7 +50,6 @@ type (
|
||||
// `*asyncInstrument` instances
|
||||
asyncLock sync.Mutex
|
||||
asyncInstruments *internal.AsyncInstrumentState
|
||||
asyncContext context.Context
|
||||
|
||||
// currentEpoch is the current epoch number. It is
|
||||
// incremented in `Collect()`.
|
||||
@ -354,13 +353,13 @@ func (m *Accumulator) Collect(ctx context.Context) int {
|
||||
defer m.collectLock.Unlock()
|
||||
|
||||
checkpointed := m.observeAsyncInstruments(ctx)
|
||||
checkpointed += m.collectSyncInstruments(ctx)
|
||||
checkpointed += m.collectSyncInstruments()
|
||||
m.currentEpoch++
|
||||
|
||||
return checkpointed
|
||||
}
|
||||
|
||||
func (m *Accumulator) collectSyncInstruments(ctx context.Context) int {
|
||||
func (m *Accumulator) collectSyncInstruments() int {
|
||||
checkpointed := 0
|
||||
|
||||
m.current.Range(func(key interface{}, value interface{}) bool {
|
||||
@ -374,7 +373,7 @@ func (m *Accumulator) collectSyncInstruments(ctx context.Context) int {
|
||||
if mods != coll {
|
||||
// Updates happened in this interval,
|
||||
// checkpoint and continue.
|
||||
checkpointed += m.checkpointRecord(ctx, inuse)
|
||||
checkpointed += m.checkpointRecord(inuse)
|
||||
inuse.collectedCount = mods
|
||||
return true
|
||||
}
|
||||
@ -395,7 +394,7 @@ func (m *Accumulator) collectSyncInstruments(ctx context.Context) int {
|
||||
// last we'll see of this record, checkpoint
|
||||
mods = atomic.LoadInt64(&inuse.updateCount)
|
||||
if mods != coll {
|
||||
checkpointed += m.checkpointRecord(ctx, inuse)
|
||||
checkpointed += m.checkpointRecord(inuse)
|
||||
}
|
||||
return true
|
||||
})
|
||||
@ -419,10 +418,9 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int {
|
||||
defer m.asyncLock.Unlock()
|
||||
|
||||
asyncCollected := 0
|
||||
m.asyncContext = ctx
|
||||
|
||||
// TODO: change this to `ctx` (in a separate PR, with tests)
|
||||
m.asyncInstruments.Run(context.Background(), m)
|
||||
m.asyncContext = nil
|
||||
|
||||
for _, inst := range m.asyncInstruments.Instruments() {
|
||||
if a := m.fromAsync(inst); a != nil {
|
||||
@ -433,8 +431,8 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int {
|
||||
return asyncCollected
|
||||
}
|
||||
|
||||
func (m *Accumulator) checkpointRecord(ctx context.Context, r *record) int {
|
||||
return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels)
|
||||
func (m *Accumulator) checkpointRecord(r *record) int {
|
||||
return m.checkpoint(&r.inst.descriptor, r.recorder, r.labels)
|
||||
}
|
||||
|
||||
func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
|
||||
@ -446,7 +444,7 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
|
||||
lrec := lrec
|
||||
epochDiff := m.currentEpoch - lrec.observedEpoch
|
||||
if epochDiff == 0 {
|
||||
checkpointed += m.checkpoint(m.asyncContext, &a.descriptor, lrec.recorder, lrec.labels)
|
||||
checkpointed += m.checkpoint(&a.descriptor, lrec.recorder, lrec.labels)
|
||||
} else if epochDiff > 1 {
|
||||
// This is second collection cycle with no
|
||||
// observations for this labelset. Remove the
|
||||
@ -460,14 +458,14 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
|
||||
return checkpointed
|
||||
}
|
||||
|
||||
func (m *Accumulator) checkpoint(ctx context.Context, descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int {
|
||||
func (m *Accumulator) checkpoint(descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int {
|
||||
if recorder == nil {
|
||||
return 0
|
||||
}
|
||||
recorder.Checkpoint(ctx, descriptor)
|
||||
recorder.Checkpoint(descriptor)
|
||||
|
||||
exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder)
|
||||
err := m.integrator.Process(ctx, exportRecord)
|
||||
err := m.integrator.Process(exportRecord)
|
||||
if err != nil {
|
||||
global.Handle(err)
|
||||
}
|
||||
|
@ -263,7 +263,7 @@ func (*testFixture) CheckpointSet() export.CheckpointSet {
|
||||
func (*testFixture) FinishedCollection() {
|
||||
}
|
||||
|
||||
func (f *testFixture) Process(_ context.Context, record export.Record) error {
|
||||
func (f *testFixture) Process(record export.Record) error {
|
||||
labels := record.Labels().ToSlice()
|
||||
key := testKey{
|
||||
labels: canonicalizeLabels(labels),
|
||||
|
@ -105,9 +105,10 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio
|
||||
queue: make(chan *export.SpanData, o.MaxQueueSize),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
bsp.stopWait.Add(1)
|
||||
|
||||
bsp.stopWait.Add(1)
|
||||
go func() {
|
||||
defer bsp.stopWait.Done()
|
||||
bsp.processQueue()
|
||||
bsp.drainQueue()
|
||||
}()
|
||||
@ -130,8 +131,6 @@ func (bsp *BatchSpanProcessor) Shutdown() {
|
||||
bsp.stopOnce.Do(func() {
|
||||
close(bsp.stopCh)
|
||||
bsp.stopWait.Wait()
|
||||
close(bsp.queue)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
@ -173,7 +172,6 @@ func (bsp *BatchSpanProcessor) exportSpans() {
|
||||
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
|
||||
// waiting up to BatchTimeout to form a batch.
|
||||
func (bsp *BatchSpanProcessor) processQueue() {
|
||||
defer bsp.stopWait.Done()
|
||||
defer bsp.timer.Stop()
|
||||
|
||||
for {
|
||||
@ -197,13 +195,22 @@ func (bsp *BatchSpanProcessor) processQueue() {
|
||||
// drainQueue awaits the any caller that had added to bsp.stopWait
|
||||
// to finish the enqueue, then exports the final batch.
|
||||
func (bsp *BatchSpanProcessor) drainQueue() {
|
||||
for sd := range bsp.queue {
|
||||
bsp.batch = append(bsp.batch, sd)
|
||||
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
|
||||
bsp.exportSpans()
|
||||
for {
|
||||
select {
|
||||
case sd := <-bsp.queue:
|
||||
if sd == nil {
|
||||
bsp.exportSpans()
|
||||
return
|
||||
}
|
||||
|
||||
bsp.batch = append(bsp.batch, sd)
|
||||
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
|
||||
bsp.exportSpans()
|
||||
}
|
||||
default:
|
||||
close(bsp.queue)
|
||||
}
|
||||
}
|
||||
bsp.exportSpans()
|
||||
}
|
||||
|
||||
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
|
||||
@ -226,17 +233,19 @@ func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
|
||||
panic(x)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-bsp.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if bsp.o.BlockOnQueueFull {
|
||||
select {
|
||||
case bsp.queue <- sd:
|
||||
case <-bsp.stopCh:
|
||||
}
|
||||
bsp.queue <- sd
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case bsp.queue <- sd:
|
||||
case <-bsp.stopCh:
|
||||
default:
|
||||
atomic.AddUint32(&bsp.dropped, 1)
|
||||
}
|
||||
|
@ -117,7 +117,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
|
||||
sdktrace.WithBatchTimeout(schDelay),
|
||||
sdktrace.WithMaxQueueSize(200),
|
||||
sdktrace.WithMaxExportBatchSize(20),
|
||||
sdktrace.WithBlocking(),
|
||||
},
|
||||
wantNumSpans: 205,
|
||||
wantBatchCount: 11,
|
||||
@ -139,7 +138,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
|
||||
o: []sdktrace.BatchSpanProcessorOption{
|
||||
sdktrace.WithBatchTimeout(schDelay),
|
||||
sdktrace.WithMaxExportBatchSize(200),
|
||||
sdktrace.WithBlocking(),
|
||||
},
|
||||
wantNumSpans: 2000,
|
||||
wantBatchCount: 10,
|
||||
@ -162,18 +160,26 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
|
||||
|
||||
tp.UnregisterSpanProcessor(ssp)
|
||||
|
||||
// TODO(https://github.com/open-telemetry/opentelemetry-go/issues/741)
|
||||
// Restore some sort of test here.
|
||||
_ = option.wantNumSpans
|
||||
_ = option.wantBatchCount
|
||||
_ = te.len() // gotNumOfSpans
|
||||
_ = te.getBatchCount() // gotBatchCount
|
||||
gotNumOfSpans := te.len()
|
||||
if option.wantNumSpans != gotNumOfSpans {
|
||||
t.Errorf("number of exported span: got %+v, want %+v\n",
|
||||
gotNumOfSpans, option.wantNumSpans)
|
||||
}
|
||||
|
||||
gotBatchCount := te.getBatchCount()
|
||||
if gotBatchCount < option.wantBatchCount {
|
||||
t.Errorf("number batches: got %+v, want >= %+v\n",
|
||||
gotBatchCount, option.wantBatchCount)
|
||||
t.Errorf("Batches %v\n", te.sizes)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func createAndRegisterBatchSP(t *testing.T, option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor {
|
||||
ssp, err := sdktrace.NewBatchSpanProcessor(te, option.o...)
|
||||
// Always use blocking queue to avoid flaky tests.
|
||||
options := append(option.o, sdktrace.WithBlocking())
|
||||
ssp, err := sdktrace.NewBatchSpanProcessor(te, options...)
|
||||
if ssp == nil {
|
||||
t.Errorf("%s: Error creating new instance of BatchSpanProcessor, error: %v\n", option.name, err)
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
export "go.opentelemetry.io/otel/sdk/export/trace"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
@ -45,7 +46,7 @@ type ProviderOption func(*ProviderOptions)
|
||||
|
||||
type Provider struct {
|
||||
mu sync.Mutex
|
||||
namedTracer map[string]*tracer
|
||||
namedTracer map[instrumentation.Library]*tracer
|
||||
spanProcessors atomic.Value
|
||||
config atomic.Value // access atomically
|
||||
}
|
||||
@ -63,7 +64,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
|
||||
}
|
||||
|
||||
tp := &Provider{
|
||||
namedTracer: make(map[string]*tracer),
|
||||
namedTracer: make(map[instrumentation.Library]*tracer),
|
||||
}
|
||||
tp.config.Store(&Config{
|
||||
DefaultSampler: AlwaysSample(),
|
||||
@ -93,16 +94,27 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
|
||||
|
||||
// Tracer with the given name. If a tracer for the given name does not exist,
|
||||
// it is created first. If the name is empty, DefaultTracerName is used.
|
||||
func (p *Provider) Tracer(name string) apitrace.Tracer {
|
||||
func (p *Provider) Tracer(name string, opts ...apitrace.TracerOption) apitrace.Tracer {
|
||||
c := new(apitrace.TracerConfig)
|
||||
for _, o := range opts {
|
||||
o(c)
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if name == "" {
|
||||
name = defaultTracerName
|
||||
}
|
||||
t, ok := p.namedTracer[name]
|
||||
il := instrumentation.Library{
|
||||
Name: name,
|
||||
Version: c.InstrumentationVersion,
|
||||
}
|
||||
t, ok := p.namedTracer[il]
|
||||
if !ok {
|
||||
t = &tracer{name: name, provider: p}
|
||||
p.namedTracer[name] = t
|
||||
t = &tracer{
|
||||
provider: p,
|
||||
instrumentationLibrary: il,
|
||||
}
|
||||
p.namedTracer[il] = t
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
@ -19,11 +19,12 @@ import (
|
||||
|
||||
apitrace "go.opentelemetry.io/otel/api/trace"
|
||||
"go.opentelemetry.io/otel/internal/trace/parent"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
)
|
||||
|
||||
type tracer struct {
|
||||
provider *Provider
|
||||
name string
|
||||
provider *Provider
|
||||
instrumentationLibrary instrumentation.Library
|
||||
}
|
||||
|
||||
var _ apitrace.Tracer = &tracer{}
|
||||
|
Loading…
x
Reference in New Issue
Block a user