1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-05-27 22:37:43 +02:00

OpenCensus metric exporter bridge ()

* add OpenCensus metric exporter bridge

* Update bridge/opencensus/README.md

Co-authored-by: Eric Sirianni <sirianni@users.noreply.github.com>

Co-authored-by: Eric Sirianni <sirianni@users.noreply.github.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
David Ashpole 2021-03-11 12:49:20 -05:00 committed by GitHub
parent 77aa218d4d
commit a1539d4485
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1319 additions and 9 deletions

@ -133,6 +133,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Added codeql worfklow to GitHub Actions (#1428)
- Added Gosec workflow to GitHub Actions (#1429)
- Add new HTTP driver for OTLP exporter in `exporters/otlp/otlphttp`. Currently it only supports the binary protobuf payloads. (#1420)
- Add an OpenCensus exporter bridge. (#1444)
### Changed

@ -2,7 +2,9 @@
The OpenCensus Bridge helps facilitate the migration of an application from OpenCensus to OpenTelemetry.
## The Problem: Mixing OpenCensus and OpenTelemetry libraries
## Tracing
### The Problem: Mixing OpenCensus and OpenTelemetry libraries
In a perfect world, one would simply migrate their entire go application --including custom instrumentation, libraries, and exporters-- from OpenCensus to OpenTelemetry all at once. In the real world, dependency constraints, third-party ownership of libraries, or other reasons may require mixing OpenCensus and OpenTelemetry libraries in a single application.
@ -44,10 +46,12 @@ The bridge implements the OpenCensus trace API using OpenTelemetry. This would
### User Journey
Starting from an application using entirely OpenCensus APIs:
1. Instantiate OpenTelemetry SDK and Exporters
2. Override OpenCensus' DefaultTracer with the bridge
3. Migrate libraries from OpenCensus to OpenTelemetry
4. Remove OpenCensus Exporters
3. Migrate libraries individually from OpenCensus to OpenTelemetry
4. Remove OpenCensus exporters and configuration
To override OpenCensus' DefaultTracer with the bridge:
```golang
@ -63,10 +67,56 @@ octrace.DefaultTracer = opencensus.NewTracer(tracer)
Be sure to set the `Tracer` name to your instrumentation package name instead of `"bridge"`.
### Incompatibilities
#### Incompatibilities
OpenCensus and OpenTelemetry APIs are not entirely compatible. If the bridge finds any incompatibilities, it will log them. Incompatibilities include:
* Custom OpenCensus Samplers specified during StartSpan are ignored.
* Links cannot be added to OpenCensus spans.
* OpenTelemetry Debug or Deferred trace flags are dropped after an OpenCensus span is created.
## Metrics
### The problem: mixing libraries without mixing pipelines
The problem for monitoring is simpler than the problem for tracing, since there
are no context propagation issues to deal with. However, it still is difficult
for users to migrate an entire applications' monitoring at once. It
should be possible to send metrics generated by OpenCensus libraries to an
OpenTelemetry pipeline so that migrating a metric does not require maintaining
separate export pipelines for OpenCensus and OpenTelemetry.
### The Exporter "wrapper" solution
The solution we use here is to allow wrapping an OpenTelemetry exporter such
that it implements the OpenCensus exporter interfaces. This allows a single
exporter to be used for metrics from *both* OpenCensus and OpenTelemetry.
### User Journey
Starting from an application using entirely OpenCensus APIs:
1. Instantiate OpenTelemetry SDK and Exporters.
2. Replace OpenCensus exporters with a wrapped OpenTelemetry exporter from step 1.
3. Migrate libraries individually from OpenCensus to OpenTelemetry
4. Remove OpenCensus Exporters and configuration.
For example, to swap out the OpenCensus logging exporter for the OpenTelemetry stdout exporter:
```golang
import (
"go.opencensus.io/metric/metricexport"
"go.opentelemetry.io/otel/bridge/opencensus"
"go.opentelemetry.io/otel/exporters/stdout"
"go.opentelemetry.io/otel"
)
// With OpenCensus, you could have previously configured the logging exporter like this:
// import logexporter "go.opencensus.io/examples/exporter"
// exporter, _ := logexporter.NewLogExporter(logexporter.Options{})
// Instead, we can create an equivalent using the OpenTelemetry stdout exporter:
openTelemetryExporter, _ := stdout.NewExporter(stdout.WithPrettyPrint())
exporter := opencensus.NewMetricExporter(openTelemetryExporter)
// Use the wrapped OpenTelemetry exporter like you normally would with OpenCensus
intervalReader, _ := metricexport.NewIntervalReader(&metricexport.Reader{}, exporter)
intervalReader.Start()
```

@ -0,0 +1,180 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opencensus
import (
"errors"
"fmt"
"time"
"go.opencensus.io/metric/metricdata"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
)
var (
errIncompatibleType = errors.New("incompatible type for aggregation")
errEmpty = errors.New("points may not be empty")
errBadPoint = errors.New("point cannot be converted")
)
// aggregationWithEndTime is an aggregation that can also provide the timestamp
// of the last recorded point.
type aggregationWithEndTime interface {
aggregation.Aggregation
end() time.Time
}
// newAggregationFromPoints creates an OpenTelemetry aggregation from
// OpenCensus points. Points may not be empty and must be either
// all (int|float)64 or all *metricdata.Distribution.
func newAggregationFromPoints(points []metricdata.Point) (aggregationWithEndTime, error) {
if len(points) == 0 {
return nil, errEmpty
}
switch t := points[0].Value.(type) {
case int64:
return newExactAggregator(points)
case float64:
return newExactAggregator(points)
case *metricdata.Distribution:
return newDistributionAggregator(points)
default:
// TODO add *metricdata.Summary support
return nil, fmt.Errorf("%w: %v", errIncompatibleType, t)
}
}
var _ aggregation.Aggregation = &ocExactAggregator{}
var _ aggregation.LastValue = &ocExactAggregator{}
var _ aggregation.Points = &ocExactAggregator{}
// newExactAggregator creates an OpenTelemetry aggreation from OpenCensus points.
// Points may not be empty, and must only contain integers or floats.
func newExactAggregator(pts []metricdata.Point) (aggregationWithEndTime, error) {
points := make([]aggregation.Point, len(pts))
for i, pt := range pts {
switch t := pt.Value.(type) {
case int64:
points[i] = aggregation.Point{
Number: number.NewInt64Number(pt.Value.(int64)),
Time: pt.Time,
}
case float64:
points[i] = aggregation.Point{
Number: number.NewFloat64Number(pt.Value.(float64)),
Time: pt.Time,
}
default:
return nil, fmt.Errorf("%w: %v", errIncompatibleType, t)
}
}
return &ocExactAggregator{
points: points,
}, nil
}
type ocExactAggregator struct {
points []aggregation.Point
}
// Kind returns the kind of aggregation this is.
func (o *ocExactAggregator) Kind() aggregation.Kind {
return aggregation.ExactKind
}
// Points returns access to the raw data set.
func (o *ocExactAggregator) Points() ([]aggregation.Point, error) {
return o.points, nil
}
// LastValue returns the last point.
func (o *ocExactAggregator) LastValue() (number.Number, time.Time, error) {
last := o.points[len(o.points)-1]
return last.Number, last.Time, nil
}
// end returns the timestamp of the last point
func (o *ocExactAggregator) end() time.Time {
_, t, _ := o.LastValue()
return t
}
var _ aggregation.Aggregation = &ocDistAggregator{}
var _ aggregation.Histogram = &ocDistAggregator{}
// newDistributionAggregator creates an OpenTelemetry aggreation from
// OpenCensus points. Points may not be empty, and must only contain
// Distributions. The most recent disribution will be used in the aggregation.
func newDistributionAggregator(pts []metricdata.Point) (aggregationWithEndTime, error) {
// only use the most recent datapoint for now.
pt := pts[len(pts)-1]
val, ok := pt.Value.(*metricdata.Distribution)
if !ok {
return nil, fmt.Errorf("%w: %v", errBadPoint, pt.Value)
}
bucketCounts := make([]uint64, len(val.Buckets))
for i, bucket := range val.Buckets {
if bucket.Count < 0 {
return nil, fmt.Errorf("%w: bucket count may not be negative", errBadPoint)
}
bucketCounts[i] = uint64(bucket.Count)
}
if val.Count < 0 {
return nil, fmt.Errorf("%w: count may not be negative", errBadPoint)
}
return &ocDistAggregator{
sum: number.NewFloat64Number(val.Sum),
count: uint64(val.Count),
buckets: aggregation.Buckets{
Boundaries: val.BucketOptions.Bounds,
Counts: bucketCounts,
},
endTime: pts[len(pts)-1].Time,
}, nil
}
type ocDistAggregator struct {
sum number.Number
count uint64
buckets aggregation.Buckets
endTime time.Time
}
// Kind returns the kind of aggregation this is.
func (o *ocDistAggregator) Kind() aggregation.Kind {
return aggregation.HistogramKind
}
// Sum returns the sum of values.
func (o *ocDistAggregator) Sum() (number.Number, error) {
return o.sum, nil
}
// Count returns the number of values.
func (o *ocDistAggregator) Count() (uint64, error) {
return o.count, nil
}
// Histogram returns the count of events in pre-determined buckets.
func (o *ocDistAggregator) Histogram() (aggregation.Buckets, error) {
return o.buckets, nil
}
// end returns the time the histogram was measured.
func (o *ocDistAggregator) end() time.Time {
return o.endTime
}

@ -0,0 +1,341 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opencensus
import (
"errors"
"testing"
"time"
"go.opencensus.io/metric/metricdata"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
)
func TestNewAggregationFromPoints(t *testing.T) {
now := time.Now()
for _, tc := range []struct {
desc string
input []metricdata.Point
expectedKind aggregation.Kind
expectedErr error
}{
{
desc: "no points",
expectedErr: errEmpty,
},
{
desc: "int point",
input: []metricdata.Point{
{
Time: now,
Value: int64(23),
},
},
expectedKind: aggregation.ExactKind,
},
{
desc: "float point",
input: []metricdata.Point{
{
Time: now,
Value: float64(23),
},
},
expectedKind: aggregation.ExactKind,
},
{
desc: "distribution point",
input: []metricdata.Point{
{
Time: now,
Value: &metricdata.Distribution{
Count: 2,
Sum: 55,
BucketOptions: &metricdata.BucketOptions{
Bounds: []float64{20, 30},
},
Buckets: []metricdata.Bucket{
{Count: 1},
{Count: 1},
},
},
},
},
expectedKind: aggregation.HistogramKind,
},
{
desc: "bad distribution bucket count",
input: []metricdata.Point{
{
Time: now,
Value: &metricdata.Distribution{
Count: 2,
Sum: 55,
BucketOptions: &metricdata.BucketOptions{
Bounds: []float64{20, 30},
},
Buckets: []metricdata.Bucket{
// negative bucket
{Count: -1},
{Count: 1},
},
},
},
},
expectedErr: errBadPoint,
},
{
desc: "bad distribution count",
input: []metricdata.Point{
{
Time: now,
Value: &metricdata.Distribution{
// negative count
Count: -2,
Sum: 55,
BucketOptions: &metricdata.BucketOptions{
Bounds: []float64{20, 30},
},
Buckets: []metricdata.Bucket{
{Count: 1},
{Count: 1},
},
},
},
},
expectedErr: errBadPoint,
},
{
desc: "incompatible point type bool",
input: []metricdata.Point{
{
Time: now,
Value: true,
},
},
expectedErr: errIncompatibleType,
},
{
desc: "dist is incompatible with exact",
input: []metricdata.Point{
{
Time: now,
Value: int64(23),
},
{
Time: now,
Value: &metricdata.Distribution{
Count: 2,
Sum: 55,
BucketOptions: &metricdata.BucketOptions{
Bounds: []float64{20, 30},
},
Buckets: []metricdata.Bucket{
{Count: 1},
{Count: 1},
},
},
},
},
expectedErr: errIncompatibleType,
},
{
desc: "int point is incompatible with dist",
input: []metricdata.Point{
{
Time: now,
Value: &metricdata.Distribution{
Count: 2,
Sum: 55,
BucketOptions: &metricdata.BucketOptions{
Bounds: []float64{20, 30},
},
Buckets: []metricdata.Bucket{
{Count: 1},
{Count: 1},
},
},
},
{
Time: now,
Value: int64(23),
},
},
expectedErr: errBadPoint,
},
} {
t.Run(tc.desc, func(t *testing.T) {
output, err := newAggregationFromPoints(tc.input)
if !errors.Is(err, tc.expectedErr) {
t.Errorf("newAggregationFromPoints(%v) = err(%v), want err(%v)", tc.input, err, tc.expectedErr)
}
if tc.expectedErr == nil && output.Kind() != tc.expectedKind {
t.Errorf("newAggregationFromPoints(%v) = %v, want %v", tc.input, output.Kind(), tc.expectedKind)
}
})
}
}
func TestPointsAggregation(t *testing.T) {
now := time.Now()
input := []metricdata.Point{
{Value: int64(15)},
{Value: int64(-23), Time: now},
}
output, err := newAggregationFromPoints(input)
if err != nil {
t.Fatalf("newAggregationFromPoints(%v) = err(%v), want <nil>", input, err)
}
if output.Kind() != aggregation.ExactKind {
t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.ExactKind)
}
if output.end() != now {
t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now)
}
pointsAgg, ok := output.(aggregation.Points)
if !ok {
t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output)
}
points, err := pointsAgg.Points()
if err != nil {
t.Fatalf("Unexpected err: %v", err)
}
if len(points) != len(input) {
t.Fatalf("newAggregationFromPoints(%v) resulted in %d points, want %d points", input, len(points), len(input))
}
for i := range points {
inputPoint := input[i]
outputPoint := points[i]
if inputPoint.Value != outputPoint.AsInt64() {
t.Errorf("newAggregationFromPoints(%v)[%d] = %v, want %v", input, i, outputPoint.AsInt64(), inputPoint.Value)
}
}
}
func TestLastValueAggregation(t *testing.T) {
now := time.Now()
input := []metricdata.Point{
{Value: int64(15)},
{Value: int64(-23), Time: now},
}
output, err := newAggregationFromPoints(input)
if err != nil {
t.Fatalf("newAggregationFromPoints(%v) = err(%v), want <nil>", input, err)
}
if output.Kind() != aggregation.ExactKind {
t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.ExactKind)
}
if output.end() != now {
t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now)
}
lvAgg, ok := output.(aggregation.LastValue)
if !ok {
t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output)
}
num, endTime, err := lvAgg.LastValue()
if err != nil {
t.Fatalf("Unexpected err: %v", err)
}
if endTime != now {
t.Errorf("newAggregationFromPoints(%v).LastValue() = endTime: %v, want %v", input, endTime, now)
}
if num.AsInt64() != int64(-23) {
t.Errorf("newAggregationFromPoints(%v).LastValue() = number: %v, want %v", input, num.AsInt64(), int64(-23))
}
}
func TestHistogramAggregation(t *testing.T) {
now := time.Now()
input := []metricdata.Point{
{
Value: &metricdata.Distribution{
Count: 0,
Sum: 0,
BucketOptions: &metricdata.BucketOptions{
Bounds: []float64{20, 30},
},
Buckets: []metricdata.Bucket{
{Count: 0},
{Count: 0},
},
},
},
{
Time: now,
Value: &metricdata.Distribution{
Count: 2,
Sum: 55,
BucketOptions: &metricdata.BucketOptions{
Bounds: []float64{20, 30},
},
Buckets: []metricdata.Bucket{
{Count: 1},
{Count: 1},
},
},
},
}
output, err := newAggregationFromPoints(input)
if err != nil {
t.Fatalf("newAggregationFromPoints(%v) = err(%v), want <nil>", input, err)
}
if output.Kind() != aggregation.HistogramKind {
t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.HistogramKind)
}
if output.end() != now {
t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now)
}
distAgg, ok := output.(aggregation.Histogram)
if !ok {
t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output)
}
sum, err := distAgg.Sum()
if err != nil {
t.Fatalf("Unexpected err: %v", err)
}
if sum.AsFloat64() != float64(55) {
t.Errorf("newAggregationFromPoints(%v).Sum() = %v, want %v", input, sum.AsFloat64(), float64(55))
}
count, err := distAgg.Count()
if err != nil {
t.Fatalf("Unexpected err: %v", err)
}
if count != 2 {
t.Errorf("newAggregationFromPoints(%v).Count() = %v, want %v", input, count, 2)
}
hist, err := distAgg.Histogram()
if err != nil {
t.Fatalf("Unexpected err: %v", err)
}
inputBucketBoundaries := []float64{20, 30}
if len(hist.Boundaries) != len(inputBucketBoundaries) {
t.Fatalf("newAggregationFromPoints(%v).Histogram() produced %d boundaries, want %d boundaries", input, len(hist.Boundaries), len(inputBucketBoundaries))
}
for i, b := range hist.Boundaries {
if b != inputBucketBoundaries[i] {
t.Errorf("newAggregationFromPoints(%v).Histogram().Boundaries[%d] = %v, want %v", input, i, b, inputBucketBoundaries[i])
}
}
inputBucketCounts := []uint64{1, 1}
if len(hist.Counts) != len(inputBucketCounts) {
t.Fatalf("newAggregationFromPoints(%v).Histogram() produced %d buckets, want %d buckets", input, len(hist.Counts), len(inputBucketCounts))
}
for i, c := range hist.Counts {
if c != inputBucketCounts[i] {
t.Errorf("newAggregationFromPoints(%v).Histogram().Counts[%d] = %d, want %d", input, i, c, inputBucketCounts[i])
}
}
}

@ -0,0 +1,168 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opencensus
import (
"context"
"errors"
"fmt"
"sync"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricexport"
ocresource "go.opencensus.io/resource"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/unit"
)
var errConversion = errors.New("Unable to convert from OpenCensus to OpenTelemetry")
// NewMetricExporter returns an OpenCensus exporter that exports to an
// OpenTelemetry exporter
func NewMetricExporter(base export.Exporter) metricexport.Exporter {
return &exporter{base: base}
}
// exporter implements the OpenCensus metric Exporter interface using an
// OpenTelemetry base exporter.
type exporter struct {
base export.Exporter
}
// ExportMetrics implements the OpenCensus metric Exporter interface
func (e *exporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
return e.base.Export(ctx, &checkpointSet{metrics: metrics})
}
type checkpointSet struct {
// RWMutex implements locking for the `CheckpointSet` interface.
sync.RWMutex
metrics []*metricdata.Metric
}
// ForEach iterates through the CheckpointSet, passing an
// export.Record with the appropriate aggregation to an exporter.
func (d *checkpointSet) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error {
for _, m := range d.metrics {
descriptor, err := convertDescriptor(m.Descriptor)
if err != nil {
otel.Handle(err)
continue
}
res := convertResource(m.Resource)
for _, ts := range m.TimeSeries {
if len(ts.Points) == 0 {
continue
}
ls, err := convertLabels(m.Descriptor.LabelKeys, ts.LabelValues)
if err != nil {
otel.Handle(err)
continue
}
agg, err := newAggregationFromPoints(ts.Points)
if err != nil {
otel.Handle(err)
continue
}
if err := f(export.NewRecord(
&descriptor,
&ls,
res,
agg,
ts.StartTime,
agg.end(),
)); err != nil && !errors.Is(err, aggregation.ErrNoData) {
return err
}
}
}
return nil
}
// convertLabels converts from OpenCensus label keys and values to an
// OpenTelemetry label Set.
func convertLabels(keys []metricdata.LabelKey, values []metricdata.LabelValue) (attribute.Set, error) {
if len(keys) != len(values) {
return attribute.NewSet(), fmt.Errorf("%w different number of label keys (%d) and values (%d)", errConversion, len(keys), len(values))
}
labels := []attribute.KeyValue{}
for i, lv := range values {
if !lv.Present {
continue
}
labels = append(labels, attribute.KeyValue{
Key: attribute.Key(keys[i].Key),
Value: attribute.StringValue(lv.Value),
})
}
return attribute.NewSet(labels...), nil
}
// convertResource converts an OpenCensus Resource to an OpenTelemetry Resource
func convertResource(res *ocresource.Resource) *resource.Resource {
labels := []attribute.KeyValue{}
if res == nil {
return nil
}
for k, v := range res.Labels {
labels = append(labels, attribute.KeyValue{Key: attribute.Key(k), Value: attribute.StringValue(v)})
}
return resource.NewWithAttributes(labels...)
}
// convertDescriptor converts an OpenCensus Descriptor to an OpenTelemetry Descriptor
func convertDescriptor(ocDescriptor metricdata.Descriptor) (metric.Descriptor, error) {
var (
nkind number.Kind
ikind metric.InstrumentKind
)
switch ocDescriptor.Type {
case metricdata.TypeGaugeInt64:
nkind = number.Int64Kind
ikind = metric.ValueObserverInstrumentKind
case metricdata.TypeGaugeFloat64:
nkind = number.Float64Kind
ikind = metric.ValueObserverInstrumentKind
case metricdata.TypeCumulativeInt64:
nkind = number.Int64Kind
ikind = metric.SumObserverInstrumentKind
case metricdata.TypeCumulativeFloat64:
nkind = number.Float64Kind
ikind = metric.SumObserverInstrumentKind
default:
// Includes TypeGaugeDistribution, TypeCumulativeDistribution, TypeSummary
return metric.Descriptor{}, fmt.Errorf("%w; descriptor type: %v", errConversion, ocDescriptor.Type)
}
opts := []metric.InstrumentOption{
metric.WithDescription(ocDescriptor.Description),
metric.WithInstrumentationName("OpenCensus Bridge"),
}
switch ocDescriptor.Unit {
case metricdata.UnitDimensionless:
opts = append(opts, metric.WithUnit(unit.Dimensionless))
case metricdata.UnitBytes:
opts = append(opts, metric.WithUnit(unit.Bytes))
case metricdata.UnitMilliseconds:
opts = append(opts, metric.WithUnit(unit.Milliseconds))
}
return metric.NewDescriptor(ocDescriptor.Name, ikind, nkind, opts...), nil
}

@ -0,0 +1,480 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opencensus
import (
"context"
"errors"
"fmt"
"testing"
"time"
"go.opentelemetry.io/otel"
"go.opencensus.io/metric/metricdata"
ocresource "go.opencensus.io/resource"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
exportmetric "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/unit"
)
type fakeExporter struct {
export.Exporter
records []export.Record
err error
}
func (f *fakeExporter) Export(ctx context.Context, cps exportmetric.CheckpointSet) error {
return cps.ForEach(f, func(record exportmetric.Record) error {
f.records = append(f.records, record)
return f.err
})
}
type fakeErrorHandler struct {
err error
}
func (f *fakeErrorHandler) Handle(err error) {
f.err = err
}
func (f *fakeErrorHandler) matches(err error) error {
// make sure err is cleared for the next test
defer func() { f.err = nil }()
if !errors.Is(f.err, err) {
return fmt.Errorf("err(%v), want err(%v)", f.err, err)
}
return nil
}
func TestExportMetrics(t *testing.T) {
now := time.Now()
basicDesc := metric.NewDescriptor(
"",
metric.ValueObserverInstrumentKind,
number.Int64Kind,
metric.WithInstrumentationName("OpenCensus Bridge"),
)
fakeErrorHandler := &fakeErrorHandler{}
otel.SetErrorHandler(fakeErrorHandler)
for _, tc := range []struct {
desc string
input []*metricdata.Metric
exportErr error
expected []export.Record
expectedHandledError error
}{
{
desc: "no metrics",
},
{
desc: "metric without points is dropped",
input: []*metricdata.Metric{
{
TimeSeries: []*metricdata.TimeSeries{
{},
},
},
},
},
{
desc: "descriptor conversion error",
input: []*metricdata.Metric{
// TypeGaugeDistribution isn't supported
{Descriptor: metricdata.Descriptor{Type: metricdata.TypeGaugeDistribution}},
},
expectedHandledError: errConversion,
},
{
desc: "labels conversion error",
input: []*metricdata.Metric{
{
// No descriptor with label keys.
TimeSeries: []*metricdata.TimeSeries{
// 1 label value, which doens't exist in keys.
{
LabelValues: []metricdata.LabelValue{{Value: "foo", Present: true}},
Points: []metricdata.Point{
{},
},
},
},
},
},
expectedHandledError: errConversion,
},
{
desc: "unsupported summary point type",
input: []*metricdata.Metric{
{
TimeSeries: []*metricdata.TimeSeries{
{
Points: []metricdata.Point{
{Value: &metricdata.Summary{}},
},
},
},
},
},
expectedHandledError: errIncompatibleType,
},
{
desc: "success",
input: []*metricdata.Metric{
{
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: now,
Points: []metricdata.Point{
{Value: int64(123), Time: now},
},
},
},
},
},
expected: []export.Record{
export.NewRecord(
&basicDesc,
attribute.EmptySet(),
resource.NewWithAttributes(),
&ocExactAggregator{
points: []aggregation.Point{
{
Number: number.NewInt64Number(123),
Time: now,
},
},
},
now,
now,
),
},
},
{
desc: "export error after success",
input: []*metricdata.Metric{
{
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: now,
Points: []metricdata.Point{
{Value: int64(123), Time: now},
},
},
},
},
},
expected: []export.Record{
export.NewRecord(
&basicDesc,
attribute.EmptySet(),
resource.NewWithAttributes(),
&ocExactAggregator{
points: []aggregation.Point{
{
Number: number.NewInt64Number(123),
Time: now,
},
},
},
now,
now,
),
},
exportErr: errors.New("failed to export"),
},
{
desc: "partial success sends correct metrics and drops incorrect metrics with handled err",
input: []*metricdata.Metric{
{
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: now,
Points: []metricdata.Point{
{Value: int64(123), Time: now},
},
},
},
},
// TypeGaugeDistribution isn't supported
{Descriptor: metricdata.Descriptor{Type: metricdata.TypeGaugeDistribution}},
},
expected: []export.Record{
export.NewRecord(
&basicDesc,
attribute.EmptySet(),
resource.NewWithAttributes(),
&ocExactAggregator{
points: []aggregation.Point{
{
Number: number.NewInt64Number(123),
Time: now,
},
},
},
now,
now,
),
},
expectedHandledError: errConversion,
},
} {
t.Run(tc.desc, func(t *testing.T) {
fakeExporter := &fakeExporter{err: tc.exportErr}
err := NewMetricExporter(fakeExporter).ExportMetrics(context.Background(), tc.input)
if !errors.Is(err, tc.exportErr) {
t.Errorf("NewMetricExporter(%+v) = err(%v), want err(%v)", tc.input, err, tc.exportErr)
}
// Check the global error handler, since we don't return errors
// which occur during conversion.
err = fakeErrorHandler.matches(tc.expectedHandledError)
if err != nil {
t.Fatalf("ExportMetrics(%+v) = %v", tc.input, err)
}
output := fakeExporter.records
if len(tc.expected) != len(output) {
t.Fatalf("ExportMetrics(%+v) = %d records, want %d records", tc.input, len(output), len(tc.expected))
}
for i, expected := range tc.expected {
if output[i].StartTime() != expected.StartTime() {
t.Errorf("ExportMetrics(%+v)[i].StartTime() = %+v, want %+v", tc.input, output[i].StartTime(), expected.StartTime())
}
if output[i].EndTime() != expected.EndTime() {
t.Errorf("ExportMetrics(%+v)[i].EndTime() = %+v, want %+v", tc.input, output[i].EndTime(), expected.EndTime())
}
if output[i].Resource().String() != expected.Resource().String() {
t.Errorf("ExportMetrics(%+v)[i].Resource() = %+v, want %+v", tc.input, output[i].Resource().String(), expected.Resource().String())
}
if output[i].Descriptor().Name() != expected.Descriptor().Name() {
t.Errorf("ExportMetrics(%+v)[i].Descriptor() = %+v, want %+v", tc.input, output[i].Descriptor().Name(), expected.Descriptor().Name())
}
// Don't bother with a complete check of the descriptor.
// That is checked as part of descriptor conversion tests below.
if !output[i].Labels().Equals(expected.Labels()) {
t.Errorf("ExportMetrics(%+v)[i].Labels() = %+v, want %+v", tc.input, output[i].Labels(), expected.Labels())
}
if output[i].Aggregation().Kind() != expected.Aggregation().Kind() {
t.Errorf("ExportMetrics(%+v)[i].Aggregation() = %+v, want %+v", tc.input, output[i].Aggregation().Kind(), expected.Aggregation().Kind())
}
// Don't bother checking the contents of the points aggregation.
// Those tests are done with the aggregations themselves
}
})
}
}
func TestConvertLabels(t *testing.T) {
setWithMultipleKeys := attribute.NewSet(
attribute.KeyValue{Key: attribute.Key("first"), Value: attribute.StringValue("1")},
attribute.KeyValue{Key: attribute.Key("second"), Value: attribute.StringValue("2")},
)
for _, tc := range []struct {
desc string
inputKeys []metricdata.LabelKey
inputValues []metricdata.LabelValue
expected *attribute.Set
expectedErr error
}{
{
desc: "no labels",
expected: attribute.EmptySet(),
},
{
desc: "different numbers of keys and values",
inputKeys: []metricdata.LabelKey{{Key: "foo"}},
expected: attribute.EmptySet(),
expectedErr: errConversion,
},
{
desc: "multiple keys and values",
inputKeys: []metricdata.LabelKey{{Key: "first"}, {Key: "second"}},
inputValues: []metricdata.LabelValue{
{Value: "1", Present: true},
{Value: "2", Present: true},
},
expected: &setWithMultipleKeys,
},
{
desc: "multiple keys and values with some not present",
inputKeys: []metricdata.LabelKey{{Key: "first"}, {Key: "second"}, {Key: "third"}},
inputValues: []metricdata.LabelValue{
{Value: "1", Present: true},
{Value: "2", Present: true},
{Present: false},
},
expected: &setWithMultipleKeys,
},
} {
t.Run(tc.desc, func(t *testing.T) {
output, err := convertLabels(tc.inputKeys, tc.inputValues)
if !errors.Is(err, tc.expectedErr) {
t.Errorf("convertLabels(keys: %v, values: %v) = err(%v), want err(%v)", tc.inputKeys, tc.inputValues, err, tc.expectedErr)
}
if !output.Equals(tc.expected) {
t.Errorf("convertLabels(keys: %v, values: %v) = %+v, want %+v", tc.inputKeys, tc.inputValues, output.ToSlice(), tc.expected.ToSlice())
}
})
}
}
func TestConvertResource(t *testing.T) {
for _, tc := range []struct {
desc string
input *ocresource.Resource
expected *resource.Resource
}{
{
desc: "nil resource",
},
{
desc: "empty resource",
input: &ocresource.Resource{
Labels: map[string]string{},
},
expected: resource.NewWithAttributes(),
},
{
desc: "resource with labels",
input: &ocresource.Resource{
Labels: map[string]string{
"foo": "bar",
"tick": "tock",
},
},
expected: resource.NewWithAttributes(
attribute.KeyValue{Key: attribute.Key("foo"), Value: attribute.StringValue("bar")},
attribute.KeyValue{Key: attribute.Key("tick"), Value: attribute.StringValue("tock")},
),
},
} {
t.Run(tc.desc, func(t *testing.T) {
output := convertResource(tc.input)
if !output.Equal(tc.expected) {
t.Errorf("convertResource(%v) = %+v, want %+v", tc.input, output, tc.expected)
}
})
}
}
func TestConvertDescriptor(t *testing.T) {
for _, tc := range []struct {
desc string
input metricdata.Descriptor
expected metric.Descriptor
expectedErr error
}{
{
desc: "empty descriptor",
expected: metric.NewDescriptor(
"",
metric.ValueObserverInstrumentKind,
number.Int64Kind,
metric.WithInstrumentationName("OpenCensus Bridge"),
),
},
{
desc: "gauge int64 bytes",
input: metricdata.Descriptor{
Name: "foo",
Description: "bar",
Unit: metricdata.UnitBytes,
Type: metricdata.TypeGaugeInt64,
},
expected: metric.NewDescriptor(
"foo",
metric.ValueObserverInstrumentKind,
number.Int64Kind,
metric.WithInstrumentationName("OpenCensus Bridge"),
metric.WithDescription("bar"),
metric.WithUnit(unit.Bytes),
),
},
{
desc: "gauge float64 ms",
input: metricdata.Descriptor{
Name: "foo",
Description: "bar",
Unit: metricdata.UnitMilliseconds,
Type: metricdata.TypeGaugeFloat64,
},
expected: metric.NewDescriptor(
"foo",
metric.ValueObserverInstrumentKind,
number.Float64Kind,
metric.WithInstrumentationName("OpenCensus Bridge"),
metric.WithDescription("bar"),
metric.WithUnit(unit.Milliseconds),
),
},
{
desc: "cumulative int64 dimensionless",
input: metricdata.Descriptor{
Name: "foo",
Description: "bar",
Unit: metricdata.UnitDimensionless,
Type: metricdata.TypeCumulativeInt64,
},
expected: metric.NewDescriptor(
"foo",
metric.SumObserverInstrumentKind,
number.Int64Kind,
metric.WithInstrumentationName("OpenCensus Bridge"),
metric.WithDescription("bar"),
metric.WithUnit(unit.Dimensionless),
),
},
{
desc: "cumulative float64 dimensionless",
input: metricdata.Descriptor{
Name: "foo",
Description: "bar",
Unit: metricdata.UnitDimensionless,
Type: metricdata.TypeCumulativeFloat64,
},
expected: metric.NewDescriptor(
"foo",
metric.SumObserverInstrumentKind,
number.Float64Kind,
metric.WithInstrumentationName("OpenCensus Bridge"),
metric.WithDescription("bar"),
metric.WithUnit(unit.Dimensionless),
),
},
{
desc: "incompatible TypeCumulativeDistribution",
input: metricdata.Descriptor{
Name: "foo",
Description: "bar",
Type: metricdata.TypeCumulativeDistribution,
},
expectedErr: errConversion,
},
} {
t.Run(tc.desc, func(t *testing.T) {
output, err := convertDescriptor(tc.input)
if !errors.Is(err, tc.expectedErr) {
t.Errorf("convertDescriptor(%v) = err(%v), want err(%v)", tc.input, err, tc.expectedErr)
}
if output != tc.expected {
t.Errorf("convertDescriptor(%v) = %+v, want %+v", tc.input, output, tc.expected)
}
})
}
}

@ -5,7 +5,10 @@ go 1.14
require (
go.opencensus.io v0.22.6-0.20201102222123-380f4078db9f
go.opentelemetry.io/otel v0.18.0
go.opentelemetry.io/otel/metric v0.18.0
go.opentelemetry.io/otel/oteltest v0.18.0
go.opentelemetry.io/otel/sdk v0.18.0
go.opentelemetry.io/otel/sdk/export/metric v0.0.0-00010101000000-000000000000
go.opentelemetry.io/otel/trace v0.18.0
)

@ -8,6 +8,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=

@ -15,6 +15,7 @@ require (
go.opentelemetry.io/otel/bridge/opencensus v0.18.0
go.opentelemetry.io/otel/exporters/stdout v0.18.0
go.opentelemetry.io/otel/sdk v0.18.0
go.opentelemetry.io/otel/sdk/export/metric v0.18.0
)
replace go.opentelemetry.io/otel/bridge/opentracing => ../../bridge/opentracing

@ -17,26 +17,58 @@ package main
import (
"context"
"log"
"time"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricexport"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
octrace "go.opencensus.io/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/bridge/opencensus"
"go.opentelemetry.io/otel/exporters/stdout"
otmetricexport "go.opentelemetry.io/otel/sdk/export/metric"
ottraceexport "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
var (
// instrumenttype differentiates between our gauge and view metrics.
keyType = tag.MustNewKey("instrumenttype")
// Counts the number of lines read in from standard input
countMeasure = stats.Int64("test_count", "A count of something", stats.UnitDimensionless)
countView = &view.View{
Name: "test_count",
Measure: countMeasure,
Description: "A count of something",
Aggregation: view.Count(),
TagKeys: []tag.Key{keyType},
}
)
func main() {
log.Println("Using OpenTelemetry stdout exporter.")
otExporter, err := stdout.NewExporter(stdout.WithPrettyPrint())
if err != nil {
log.Fatal(err)
}
tracing(otExporter)
monitoring(otExporter)
}
// tracing demonstrates overriding the OpenCensus DefaultTracer to send spans
// to the OpenTelemetry exporter by calling OpenCensus APIs.
func tracing(otExporter ottraceexport.SpanExporter) {
ctx := context.Background()
log.Println("Configuring OpenCensus. Not Registering any OpenCensus exporters.")
octrace.ApplyConfig(octrace.Config{DefaultSampler: octrace.AlwaysSample()})
log.Println("Registering OpenTelemetry stdout exporter.")
otExporter, err := stdout.NewExporter(stdout.WithPrettyPrint())
if err != nil {
log.Fatal(err)
}
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(otExporter))
otel.SetTracerProvider(tp)
@ -56,3 +88,56 @@ func main() {
_, innerOCSpan := octrace.StartSpan(ctx, "OpenCensusInnerSpan")
innerOCSpan.End()
}
// monitoring demonstrates creating an IntervalReader using the OpenTelemetry
// exporter to send metrics to the exporter by using either an OpenCensus
// registry or an OpenCensus view.
func monitoring(otExporter otmetricexport.Exporter) {
log.Println("Using the OpenTelemetry stdout exporter to export OpenCensus metrics. This allows routing telemetry from both OpenTelemetry and OpenCensus to a single exporter.")
ocExporter := opencensus.NewMetricExporter(otExporter)
intervalReader, err := metricexport.NewIntervalReader(&metricexport.Reader{}, ocExporter)
if err != nil {
log.Fatalf("Failed to create interval reader: %v\n", err)
}
intervalReader.ReportingInterval = 10 * time.Second
log.Println("Emitting metrics using OpenCensus APIs. These should be printed out using the OpenTelemetry stdout exporter.")
err = intervalReader.Start()
if err != nil {
log.Fatalf("Failed to start interval reader: %v\n", err)
}
defer intervalReader.Stop()
log.Println("Registering a gauge metric using an OpenCensus registry.")
r := metric.NewRegistry()
metricproducer.GlobalManager().AddProducer(r)
gauge, err := r.AddInt64Gauge(
"test_gauge",
metric.WithDescription("A gauge for testing"),
metric.WithConstLabel(map[metricdata.LabelKey]metricdata.LabelValue{
{Key: keyType.Name()}: metricdata.NewLabelValue("gauge"),
}),
)
if err != nil {
log.Fatalf("Failed to add gauge: %v\n", err)
}
entry, err := gauge.GetEntry()
if err != nil {
log.Fatalf("Failed to get gauge entry: %v\n", err)
}
log.Println("Registering a cumulative metric using an OpenCensus view.")
if err := view.Register(countView); err != nil {
log.Fatalf("Failed to register views: %v", err)
}
ctx, err := tag.New(context.Background(), tag.Insert(keyType, "view"))
if err != nil {
log.Fatalf("Failed to set tag: %v\n", err)
}
for i := int64(1); true; i++ {
// update stats for our gauge
entry.Set(i)
// update stats for our view
stats.Record(ctx, countMeasure.M(1))
time.Sleep(time.Second)
}
}