1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Dogstatsd metrics exporter (#326)

* Add MetricAggregator.Merge() implementations

* Update from feedback

* Type

* Ckpt

* Ckpt

* Add push controller

* Ckpt

* Add aggregator interfaces, stdout encoder

* Modify basic main.go

* Main is working

* Batch stdout output

* Sum udpate

* Rename stdout

* Add stateless/stateful Batcher options

* Undo a for-loop in the example, remove a done TODO

* Update imports

* Add note

* Rename defaultkeys

* Support variable label encoder to speed OpenMetrics/Statsd export

* Lint

* Checkpoint

* Checkpoint

* Doc

* Precommit/lint

* Simplify Aggregator API

* Record->Identifier

* Remove export.Record a.k.a. Identifier

* Checkpoint

* Propagate errors to the SDK, remove a bunch of 'TODO warn'

* Checkpoint

* Introduce export.Labels

* Comments in export/metric.go

* Comment

* More merge

* More doc

* Complete example

* Lint fixes

* Add a testable example

* Lint

* Dogstats

* Let Export return an error

* Checkpoint

* add a basic stdout exporter test

* Add measure test; fix aggregator APIs

* Use JSON numbers, not strings

* Test stdout exporter error

* Add a test for the call to RangeTest

* Add error handler API to improve correctness test; return errors from RecordOne

* Undo the previous -- do not expose errors

* Add simple selector variations, test

* Repair examples

* Test push controller error handling

* Add SDK label encoder tests

* Add a defaultkeys batcher test

* Add an ungrouped batcher test

* Lint new tests

* Respond to krnowak's feedback

* Checkpoint

* Funciontal example using unixgram

* Tidy the example

* Add a packet-split test

* More tests

* Undo comment

* Use concrete receivers for export records and labels, since the constructors return structs not pointers

* Bug fix for stateful batchers; clone an aggregator for long term storage

* Remove TODO addressed in #318

* Add errors to all aggregator interfaces

* Handle ErrNoLastValue case in stdout exporter

* Move aggregator API into sdk/export/metric/aggregator

* Update all aggregator exported-method comments

* Document the aggregator APIs

* More aggregator comments

* Add multiple updates to the ungrouped test

* Fixes for feedback from Gustavo and Liz

* Producer->CheckpointSet; add FinishedCollection

* Process takes an export.Record

* ReadCheckpoint->CheckpointSet

* EncodeLabels->Encode

* Format a better inconsistent type error; add more aggregator API tests

* More RangeTest test coverage

* Make benbjohnson/clock a test-only dependency

* Handle ErrNoLastValue in stress_test

* Update comments; use a pipe vs a unix socket in the example test

* Update test

* Spelling

* Typo fix

* Rename DefaultLabelEncoder to NewDefaultLabelEncoder for clarity

* Rename DefaultLabelEncoder to NewDefaultLabelEncoder for clarity

* Test different adapters; add ForceEncode to statsd label encoder
This commit is contained in:
Joshua MacDonald
2019-11-21 20:46:05 -08:00
committed by GitHub
parent 6b632815f8
commit b9706b20f9
20 changed files with 1058 additions and 30 deletions

View File

@@ -65,7 +65,7 @@ func initMeter() *push.Controller {
if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err)
}
batcher := defaultkeys.New(selector, metricsdk.DefaultLabelEncoder(), true)
batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()

View File

@@ -0,0 +1,75 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dogstatsd // import "go.opentelemetry.io/otel/exporter/metric/dogstatsd"
import (
"bytes"
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"
export "go.opentelemetry.io/otel/sdk/export/metric"
)
type (
Config = statsd.Config
// Exporter implements a dogstatsd-format statsd exporter,
// which encodes label sets as independent fields in the
// output.
//
// TODO: find a link for this syntax. It's been copied out of
// code, not a specification:
//
// https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go
Exporter struct {
*statsd.Exporter
*statsd.LabelEncoder
ReencodedLabelsCount int
}
)
var (
_ export.Exporter = &Exporter{}
_ export.LabelEncoder = &Exporter{}
)
// New returns a new Dogstatsd-syntax exporter. This type implements
// the metric.LabelEncoder interface, allowing the SDK's unique label
// encoding to be pre-computed for the exporter and stored in the
// LabelSet.
func New(config Config) (*Exporter, error) {
exp := &Exporter{
LabelEncoder: statsd.NewLabelEncoder(),
}
var err error
exp.Exporter, err = statsd.NewExporter(config, exp)
return exp, err
}
// AppendName is part of the stats-internal adapter interface.
func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
}
// AppendTags is part of the stats-internal adapter interface.
func (e *Exporter) AppendTags(rec export.Record, buf *bytes.Buffer) {
encoded, inefficient := e.LabelEncoder.ForceEncode(rec.Labels())
_, _ = buf.WriteString(encoded)
if inefficient {
e.ReencodedLabelsCount++
}
}

View File

@@ -0,0 +1,69 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dogstatsd_test
import (
"bytes"
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/exporter/metric/dogstatsd"
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"
"go.opentelemetry.io/otel/exporter/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/counter"
)
// TestDogstatsLabels that labels are formatted in the correct style,
// whether or not the provided labels were encoded by a statsd label
// encoder.
func TestDogstatsLabels(t *testing.T) {
for inefficientCount, encoder := range []export.LabelEncoder{
statsd.NewLabelEncoder(), // inefficientCount == 0
sdk.NewDefaultLabelEncoder(), // inefficientCount == 1
} {
t.Run(fmt.Sprintf("%T", encoder), func(t *testing.T) {
ctx := context.Background()
checkpointSet := test.NewCheckpointSet(encoder)
desc := export.NewDescriptor("test.name", export.CounterKind, nil, "", "", core.Int64NumberKind, false)
cagg := counter.New()
_ = cagg.Update(ctx, core.NewInt64Number(123), desc)
cagg.Checkpoint(ctx, desc)
checkpointSet.Add(desc, cagg, key.New("A").String("B"))
var buf bytes.Buffer
exp, err := dogstatsd.New(dogstatsd.Config{
Writer: &buf,
})
require.Nil(t, err)
require.Equal(t, 0, exp.ReencodedLabelsCount)
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
require.Equal(t, inefficientCount, exp.ReencodedLabelsCount)
require.Equal(t, "test.name:123|c|#A:B\n", buf.String())
})
}
}

View File

@@ -0,0 +1,87 @@
package dogstatsd_test
import (
"context"
"fmt"
"io"
"log"
"sync"
"time"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporter/metric/dogstatsd"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
func ExampleNew() {
// Create a "server"
wg := &sync.WaitGroup{}
wg.Add(1)
reader, writer := io.Pipe()
go func() {
defer wg.Done()
for {
var buf [4096]byte
n, err := reader.Read(buf[:])
if err == io.EOF {
return
} else if err != nil {
log.Fatal("Read err: ", err)
} else if n >= len(buf) {
log.Fatal("Read small buffer: ", n)
} else {
fmt.Print(string(buf[0:n]))
}
}
}()
// Create a meter
selector := simple.NewWithExactMeasure()
exporter, err := dogstatsd.New(dogstatsd.Config{
// The Writer field provides test support.
Writer: writer,
// In real code, use the URL field:
//
// URL: fmt.Sprint("unix://", path),
})
if err != nil {
log.Fatal("Could not initialize dogstatsd exporter:", err)
}
// The ungrouped batcher ensures that the export sees the full
// set of labels as dogstatsd tags.
batcher := ungrouped.New(selector, false)
// The pusher automatically recognizes that the exporter
// implements the LabelEncoder interface, which ensures the
// export encoding for labels is encoded in the LabelSet.
pusher := push.New(batcher, exporter, time.Hour)
pusher.Start()
ctx := context.Background()
key := key.New("key")
// pusher implements the metric.MeterProvider interface:
meter := pusher.GetMeter("example")
// Create and update a single counter:
counter := meter.NewInt64Counter("a.counter", metric.WithKeys(key))
labels := meter.Labels(key.String("value"))
counter.Add(ctx, 100, labels)
// Flush the exporter, close the pipe, and wait for the reader.
pusher.Stop()
writer.Close()
wg.Wait()
// Output:
// a.counter:100|c|#key:value
}

View File

@@ -0,0 +1,288 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd
// See https://github.com/b/statsd_spec for the best-available statsd
// syntax specification. See also
// https://github.com/statsd/statsd/edit/master/docs/metric_types.md
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"strconv"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/unit"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
type (
// Config supports common options that apply to statsd exporters.
Config struct {
// URL describes the destination for exporting statsd data.
// e.g., udp://host:port
// tcp://host:port
// unix:///socket/path
URL string
// Writer is an alternate to providing a URL. When Writer is
// non-nil, URL will be ignored and the exporter will write to
// the configured Writer interface.
Writer io.Writer
// MaxPacketSize this limits the packet size for packet-oriented transports.
MaxPacketSize int
// TODO support Dial and Write timeouts
}
// Exporter is common type meant to implement concrete statsd
// exporters.
Exporter struct {
adapter Adapter
config Config
conn net.Conn
writer io.Writer
buffer bytes.Buffer
}
// Adapter supports statsd syntax variations, primarily plain
// statsd vs. dogstatsd.
Adapter interface {
AppendName(export.Record, *bytes.Buffer)
AppendTags(export.Record, *bytes.Buffer)
}
)
const (
formatCounter = "c"
formatHistogram = "h"
formatGauge = "g"
formatTiming = "ms"
MaxPacketSize = 1 << 16
)
var (
_ export.Exporter = &Exporter{}
ErrInvalidScheme = fmt.Errorf("Invalid statsd transport")
)
// NewExport returns a common implementation for exporters that Export
// statsd syntax.
func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
if config.MaxPacketSize <= 0 {
config.MaxPacketSize = MaxPacketSize
}
var writer io.Writer
var conn net.Conn
var err error
if config.Writer != nil {
writer = config.Writer
} else {
conn, err = dial(config.URL)
if conn != nil {
writer = conn
}
}
// TODO: If err != nil, we return it _with_ a valid exporter; the
// exporter should attempt to re-dial if it's retryable. Add a
// Start() and Stop() API.
return &Exporter{
adapter: adapter,
config: config,
conn: conn,
writer: writer,
}, err
}
// dial connects to a statsd service using several common network
// types. Presently "udp" and "unix" datagram socket connections are
// supported.
func dial(endpoint string) (net.Conn, error) {
dest, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
// TODO: Support tcp destination, need configurable timeouts first.
scheme := dest.Scheme
switch scheme {
case "udp", "udp4", "udp6":
udpAddr, err := net.ResolveUDPAddr(scheme, dest.Host)
locAddr := &net.UDPAddr{}
if err != nil {
return nil, err
}
conn, err := net.DialUDP(scheme, locAddr, udpAddr)
if err != nil {
return nil, err
}
return conn, err
case "unix", "unixgram":
scheme = "unixgram"
locAddr := &net.UnixAddr{}
sockAddr, err := net.ResolveUnixAddr(scheme, dest.Path)
if err != nil {
return nil, err
}
conn, err := net.DialUnix(scheme, locAddr, sockAddr)
if err != nil {
return nil, err
}
return conn, err
}
return nil, ErrInvalidScheme
}
// Export is common code for any statsd-based metric.Exporter implementation.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
buf := &e.buffer
buf.Reset()
var aggErr error
var sendErr error
checkpointSet.ForEach(func(rec export.Record) {
before := buf.Len()
if err := e.formatMetric(rec, buf); err != nil && aggErr == nil {
aggErr = err
return
}
if buf.Len() < e.config.MaxPacketSize {
return
}
if before == 0 {
// A single metric >= packet size
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
buf.Reset()
return
}
// Send and copy the leftover
if err := e.send(buf.Bytes()[:before]); err != nil && sendErr == nil {
sendErr = err
}
leftover := buf.Len() - before
copy(buf.Bytes()[0:leftover], buf.Bytes()[before:])
buf.Truncate(leftover)
})
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
if sendErr != nil {
return sendErr
}
return aggErr
}
// send writes a complete buffer to the writer as a blocking call.
func (e *Exporter) send(buf []byte) error {
for len(buf) != 0 {
n, err := e.writer.Write(buf)
if err != nil {
return err
}
buf = buf[n:]
}
return nil
}
// formatMetric formats an individual export record. For some records
// this will emit a single statistic, for some it will emit more than
// one.
func (e *Exporter) formatMetric(rec export.Record, buf *bytes.Buffer) error {
desc := rec.Descriptor()
agg := rec.Aggregator()
// TODO handle non-Points Distribution/MaxSumCount by
// formatting individual quantiles, the sum, and the count as
// single statistics. For the dogstatsd variation, assuming
// open-source systems like Veneur add support, figure out the
// proper encoding for "d"-type distribution data.
if pts, ok := agg.(aggregator.Points); ok {
var format string
if desc.Unit() == unit.Milliseconds {
format = formatTiming
} else {
format = formatHistogram
}
points, err := pts.Points()
if err != nil {
return err
}
for _, pt := range points {
e.formatSingleStat(rec, pt, format, buf)
}
} else if sum, ok := agg.(aggregator.Sum); ok {
sum, err := sum.Sum()
if err != nil {
return err
}
e.formatSingleStat(rec, sum, formatCounter, buf)
} else if lv, ok := agg.(aggregator.LastValue); ok {
lv, _, err := lv.LastValue()
if err != nil {
return err
}
e.formatSingleStat(rec, lv, formatGauge, buf)
}
return nil
}
// formatSingleStat encodes a single item of statsd data followed by a
// newline.
func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) {
e.adapter.AppendName(rec, buf)
_, _ = buf.WriteRune(':')
writeNumber(buf, val, rec.Descriptor().NumberKind())
_, _ = buf.WriteRune('|')
_, _ = buf.WriteString(fmtStr)
e.adapter.AppendTags(rec, buf)
_, _ = buf.WriteRune('\n')
}
func writeNumber(buf *bytes.Buffer, num core.Number, kind core.NumberKind) {
var tmp [128]byte
var conv []byte
switch kind {
case core.Int64NumberKind:
conv = strconv.AppendInt(tmp[:0], num.AsInt64(), 10)
case core.Float64NumberKind:
conv = strconv.AppendFloat(tmp[:0], num.AsFloat64(), 'g', -1, 64)
case core.Uint64NumberKind:
conv = strconv.AppendUint(tmp[:0], num.AsUint64(), 10)
}
_, _ = buf.Write(conv)
}

View File

@@ -0,0 +1,341 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd_test
import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/unit"
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"
"go.opentelemetry.io/otel/exporter/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/counter"
"go.opentelemetry.io/otel/sdk/metric/aggregator/gauge"
)
// withTagsAdapter tests a dogstatsd-style statsd exporter.
type withTagsAdapter struct {
*statsd.LabelEncoder
}
func (*withTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
}
func (ta *withTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) {
encoded, _ := ta.LabelEncoder.ForceEncode(rec.Labels())
_, _ = buf.WriteString(encoded)
}
func newWithTagsAdapter() *withTagsAdapter {
return &withTagsAdapter{
statsd.NewLabelEncoder(),
}
}
// noTagsAdapter simulates a plain-statsd exporter that appends tag
// values to the metric name.
type noTagsAdapter struct {
}
func (*noTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
for _, tag := range rec.Labels().Ordered() {
_, _ = buf.WriteString(".")
_, _ = buf.WriteString(tag.Value.Emit())
}
}
func (*noTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) {
}
func newNoTagsAdapter() *noTagsAdapter {
return &noTagsAdapter{}
}
type testWriter struct {
vec []string
}
func (w *testWriter) Write(b []byte) (int, error) {
w.vec = append(w.vec, string(b))
return len(b), nil
}
func testNumber(desc *export.Descriptor, v float64) core.Number {
if desc.NumberKind() == core.Float64NumberKind {
return core.NewFloat64Number(v)
}
return core.NewInt64Number(int64(v))
}
func gaugeAgg(desc *export.Descriptor, v float64) export.Aggregator {
ctx := context.Background()
gagg := gauge.New()
_ = gagg.Update(ctx, testNumber(desc, v), desc)
gagg.Checkpoint(ctx, desc)
return gagg
}
func counterAgg(desc *export.Descriptor, v float64) export.Aggregator {
ctx := context.Background()
cagg := counter.New()
_ = cagg.Update(ctx, testNumber(desc, v), desc)
cagg.Checkpoint(ctx, desc)
return cagg
}
func measureAgg(desc *export.Descriptor, v float64) export.Aggregator {
ctx := context.Background()
magg := array.New()
_ = magg.Update(ctx, testNumber(desc, v), desc)
magg.Checkpoint(ctx, desc)
return magg
}
func TestBasicFormat(t *testing.T) {
type adapterOutput struct {
adapter statsd.Adapter
expected string
}
for _, ao := range []adapterOutput{{
adapter: newWithTagsAdapter(),
expected: `counter:%s|c|#A:B,C:D
gauge:%s|g|#A:B,C:D
measure:%s|h|#A:B,C:D
timer:%s|ms|#A:B,C:D
`}, {
adapter: newNoTagsAdapter(),
expected: `counter.B.D:%s|c
gauge.B.D:%s|g
measure.B.D:%s|h
timer.B.D:%s|ms
`},
} {
adapter := ao.adapter
expected := ao.expected
t.Run(fmt.Sprintf("%T", adapter), func(t *testing.T) {
for _, nkind := range []core.NumberKind{
core.Float64NumberKind,
core.Int64NumberKind,
} {
t.Run(nkind.String(), func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
Writer: writer,
MaxPacketSize: 1024,
}
exp, err := statsd.NewExporter(config, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
cdesc := export.NewDescriptor(
"counter", export.CounterKind, nil, "", "", nkind, false)
gdesc := export.NewDescriptor(
"gauge", export.GaugeKind, nil, "", "", nkind, false)
mdesc := export.NewDescriptor(
"measure", export.MeasureKind, nil, "", "", nkind, false)
tdesc := export.NewDescriptor(
"timer", export.MeasureKind, nil, "", unit.Milliseconds, nkind, false)
labels := []core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
}
const value = 123.456
checkpointSet.Add(cdesc, counterAgg(cdesc, value), labels...)
checkpointSet.Add(gdesc, gaugeAgg(gdesc, value), labels...)
checkpointSet.Add(mdesc, measureAgg(mdesc, value), labels...)
checkpointSet.Add(tdesc, measureAgg(tdesc, value), labels...)
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
var vfmt string
if nkind == core.Int64NumberKind {
fv := float64(value)
vfmt = strconv.FormatInt(int64(fv), 10)
} else {
vfmt = strconv.FormatFloat(value, 'g', -1, 64)
}
require.Equal(t, 1, len(writer.vec))
require.Equal(t, fmt.Sprintf(expected, vfmt, vfmt, vfmt, vfmt), writer.vec[0])
})
}
})
}
}
func makeLabels(offset, nkeys int) []core.KeyValue {
r := make([]core.KeyValue, nkeys)
for i := range r {
r[i] = key.New(fmt.Sprint("k", offset+i)).String(fmt.Sprint("v", offset+i))
}
return r
}
type splitTestCase struct {
name string
setup func(add func(int))
check func(expected, got []string, t *testing.T)
}
var splitTestCases = []splitTestCase{
// These test use the number of keys to control where packets
// are split.
{"Simple",
func(add func(int)) {
add(1)
add(1000)
add(1)
},
func(expected, got []string, t *testing.T) {
require.EqualValues(t, expected, got)
},
},
{"LastBig",
func(add func(int)) {
add(1)
add(1)
add(1000)
},
func(expected, got []string, t *testing.T) {
require.Equal(t, 2, len(got))
require.EqualValues(t, []string{
expected[0] + expected[1],
expected[2],
}, got)
},
},
{"FirstBig",
func(add func(int)) {
add(1000)
add(1)
add(1)
add(1000)
add(1)
add(1)
},
func(expected, got []string, t *testing.T) {
require.Equal(t, 4, len(got))
require.EqualValues(t, []string{
expected[0],
expected[1] + expected[2],
expected[3],
expected[4] + expected[5],
}, got)
},
},
{"OneBig",
func(add func(int)) {
add(1000)
},
func(expected, got []string, t *testing.T) {
require.EqualValues(t, expected, got)
},
},
{"LastSmall",
func(add func(int)) {
add(1000)
add(1)
},
func(expected, got []string, t *testing.T) {
require.EqualValues(t, expected, got)
},
},
{"Overflow",
func(add func(int)) {
for i := 0; i < 1000; i++ {
add(1)
}
},
func(expected, got []string, t *testing.T) {
require.Less(t, 1, len(got))
require.Equal(t, strings.Join(expected, ""), strings.Join(got, ""))
},
},
{"Empty",
func(add func(int)) {
},
func(expected, got []string, t *testing.T) {
require.Equal(t, 0, len(got))
},
},
{"AllBig",
func(add func(int)) {
add(1000)
add(1000)
add(1000)
},
func(expected, got []string, t *testing.T) {
require.EqualValues(t, expected, got)
},
},
}
func TestPacketSplit(t *testing.T) {
for _, tcase := range splitTestCases {
t.Run(tcase.name, func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
Writer: writer,
MaxPacketSize: 1024,
}
adapter := newWithTagsAdapter()
exp, err := statsd.NewExporter(config, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
checkpointSet := test.NewCheckpointSet(adapter.LabelEncoder)
desc := export.NewDescriptor("counter", export.CounterKind, nil, "", "", core.Int64NumberKind, false)
var expected []string
offset := 0
tcase.setup(func(nkeys int) {
labels := makeLabels(offset, nkeys)
offset += nkeys
expect := fmt.Sprint("counter:100|c", adapter.LabelEncoder.Encode(labels), "\n")
expected = append(expected, expect)
checkpointSet.Add(desc, counterAgg(desc, 100), labels...)
})
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
tcase.check(expected, writer.vec, t)
})
}
}

View File

@@ -0,0 +1,84 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd
import (
"bytes"
"sync"
"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
)
// LabelEncoder encodes metric labels in the dogstatsd syntax.
//
// TODO: find a link for this syntax. It's been copied out of code,
// not a specification:
//
// https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go
type LabelEncoder struct {
pool sync.Pool
}
// sameCheck is used to test whether label encoders are the same.
type sameCheck interface {
isStatsd()
}
var _ export.LabelEncoder = &LabelEncoder{}
// NewLabelEncoder returns a new encoder for dogstatsd-syntax metric
// labels.
func NewLabelEncoder() *LabelEncoder {
return &LabelEncoder{
pool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
}
// Encode emits a string like "|#key1:value1,key2:value2".
func (e *LabelEncoder) Encode(labels []core.KeyValue) string {
buf := e.pool.Get().(*bytes.Buffer)
defer e.pool.Put(buf)
buf.Reset()
delimiter := "|#"
for _, kv := range labels {
_, _ = buf.WriteString(delimiter)
_, _ = buf.WriteString(string(kv.Key))
_, _ = buf.WriteRune(':')
_, _ = buf.WriteString(kv.Value.Emit())
delimiter = ","
}
return buf.String()
}
func (e *LabelEncoder) isStatsd() {}
// ForceEncode returns a statsd label encoding, even if the exported
// labels were encoded by a different type of encoder. Returns a
// boolean to indicate whether the labels were in fact re-encoded, to
// test for (and warn about) efficiency.
func (e *LabelEncoder) ForceEncode(labels export.Labels) (string, bool) {
if _, ok := labels.Encoder().(sameCheck); ok {
return labels.Encoded(), false
}
return e.Encode(labels.Ordered()), true
}

View File

@@ -0,0 +1,73 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd_test
import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
)
var testLabels = []core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
key.New("E").Float64(1.5),
}
func TestLabelSyntax(t *testing.T) {
encoder := statsd.NewLabelEncoder()
require.Equal(t, `|#A:B,C:D,E:1.5`, encoder.Encode(testLabels))
require.Equal(t, `|#A:B`, encoder.Encode([]core.KeyValue{
key.New("A").String("B"),
}))
require.Equal(t, "", encoder.Encode(nil))
}
func TestLabelForceEncode(t *testing.T) {
defaultLabelEncoder := sdk.NewDefaultLabelEncoder()
statsdLabelEncoder := statsd.NewLabelEncoder()
exportLabelsDefault := export.NewLabels(testLabels, defaultLabelEncoder.Encode(testLabels), defaultLabelEncoder)
exportLabelsStatsd := export.NewLabels(testLabels, statsdLabelEncoder.Encode(testLabels), statsdLabelEncoder)
statsdEncoding := exportLabelsStatsd.Encoded()
require.NotEqual(t, statsdEncoding, exportLabelsDefault.Encoded())
forced, repeat := statsdLabelEncoder.ForceEncode(exportLabelsDefault)
require.Equal(t, statsdEncoding, forced)
require.True(t, repeat)
forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsStatsd)
require.Equal(t, statsdEncoding, forced)
require.False(t, repeat)
// Check that this works for an embedded implementation.
exportLabelsEmbed := export.NewLabels(testLabels, statsdEncoding, struct {
*statsd.LabelEncoder
}{LabelEncoder: statsdLabelEncoder})
forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsEmbed)
require.Equal(t, statsdEncoding, forced)
require.False(t, repeat)
}

View File

@@ -79,7 +79,7 @@ func TestStdoutTimestamp(t *testing.T) {
before := time.Now()
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
ctx := context.Background()
desc := export.NewDescriptor("test.name", export.GaugeKind, nil, "", "", core.Int64NumberKind, false)
@@ -125,7 +125,7 @@ func TestStdoutTimestamp(t *testing.T) {
func TestStdoutCounterFormat(t *testing.T) {
fix := newFixture(t, stdout.Options{})
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
desc := export.NewDescriptor("test.name", export.CounterKind, nil, "", "", core.Int64NumberKind, false)
cagg := counter.New()
@@ -142,7 +142,7 @@ func TestStdoutCounterFormat(t *testing.T) {
func TestStdoutGaugeFormat(t *testing.T) {
fix := newFixture(t, stdout.Options{})
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
desc := export.NewDescriptor("test.name", export.GaugeKind, nil, "", "", core.Float64NumberKind, false)
gagg := gauge.New()
@@ -159,7 +159,7 @@ func TestStdoutGaugeFormat(t *testing.T) {
func TestStdoutMaxSumCount(t *testing.T) {
fix := newFixture(t, stdout.Options{})
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
magg := maxsumcount.New()
@@ -179,7 +179,7 @@ func TestStdoutMeasureFormat(t *testing.T) {
PrettyPrint: true,
})
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
magg := array.New()
@@ -223,7 +223,7 @@ func TestStdoutMeasureFormat(t *testing.T) {
func TestStdoutAggError(t *testing.T) {
fix := newFixture(t, stdout.Options{})
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
magg := ddsketch.New(ddsketch.NewDefaultConfig(), desc)
@@ -242,7 +242,7 @@ func TestStdoutAggError(t *testing.T) {
func TestStdoutGaugeNotSet(t *testing.T) {
fix := newFixture(t, stdout.Options{})
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
desc := export.NewDescriptor("test.name", export.GaugeKind, nil, "", "", core.Float64NumberKind, false)
gagg := gauge.New()

View File

@@ -53,6 +53,11 @@ type (
LastValue() (core.Number, time.Time, error)
}
// Points returns the raw set of values that were aggregated.
Points interface {
Points() ([]core.Number, error)
}
// MaxSumCount supports the Max, Sum, and Count interfaces.
MaxSumCount interface {
Sum

View File

@@ -29,17 +29,18 @@ import (
type (
Aggregator struct {
lock sync.Mutex
current Points
checkpoint Points
current points
checkpoint points
ckptSum core.Number
}
Points []core.Number
points []core.Number
)
var _ export.Aggregator = &Aggregator{}
var _ aggregator.MaxSumCount = &Aggregator{}
var _ aggregator.Distribution = &Aggregator{}
var _ aggregator.Points = &Aggregator{}
// New returns a new array aggregator, which aggregates recorded
// measurements by storing them in an array. This type uses a mutex
@@ -74,6 +75,11 @@ func (c *Aggregator) Quantile(q float64) (core.Number, error) {
return c.checkpoint.Quantile(q)
}
// Points returns access to the raw data set.
func (c *Aggregator) Points() ([]core.Number, error) {
return c.checkpoint, nil
}
// Checkpoint saves the current state and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
@@ -133,8 +139,8 @@ func (c *Aggregator) sort(kind core.NumberKind) {
}
}
func combine(a, b Points, kind core.NumberKind) Points {
result := make(Points, 0, len(a)+len(b))
func combine(a, b points, kind core.NumberKind) points {
result := make(points, 0, len(a)+len(b))
for len(a) != 0 && len(b) != 0 {
if a[0].CompareNumber(kind, b[0]) < 0 {
@@ -150,25 +156,25 @@ func combine(a, b Points, kind core.NumberKind) Points {
return result
}
func (p *Points) Len() int {
func (p *points) Len() int {
return len(*p)
}
func (p *Points) Less(i, j int) bool {
func (p *points) Less(i, j int) bool {
// Note this is specialized for int64, because float64 is
// handled by `sort.Float64s` and uint64 numbers never appear
// in this data.
return int64((*p)[i]) < int64((*p)[j])
}
func (p *Points) Swap(i, j int) {
func (p *points) Swap(i, j int) {
(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
}
// Quantile returns the least X such that Pr(x<X)>=q, where X is an
// element of the data set. This uses the "Nearest-Rank" definition
// of a quantile.
func (p *Points) Quantile(q float64) (core.Number, error) {
func (p *points) Quantile(q float64) (core.Number, error) {
if len(*p) == 0 {
return core.Number(0), aggregator.ErrEmptyDataSet
}

View File

@@ -55,7 +55,7 @@ var (
// SdkEncoder uses a non-standard encoder like K1~V1&K2~V2
SdkEncoder = &Encoder{}
// GroupEncoder uses the SDK default encoder
GroupEncoder = sdk.DefaultLabelEncoder()
GroupEncoder = sdk.NewDefaultLabelEncoder()
// Gauge groups are (labels1), (labels2+labels3)
// Counter groups are (labels1+labels2), (labels3)

View File

@@ -42,7 +42,7 @@ func newFixture(b *testing.B) *benchFixture {
bf := &benchFixture{
B: b,
}
bf.sdk = sdk.New(bf, sdk.DefaultLabelEncoder())
bf.sdk = sdk.New(bf, sdk.NewDefaultLabelEncoder())
return bf
}

View File

@@ -76,7 +76,7 @@ func New(batcher export.Batcher, exporter export.Exporter, period time.Duration)
lencoder, _ := exporter.(export.LabelEncoder)
if lencoder == nil {
lencoder = sdk.DefaultLabelEncoder()
lencoder = sdk.NewDefaultLabelEncoder()
}
return &Controller{

View File

@@ -64,7 +64,7 @@ var _ push.Clock = mockClock{}
var _ push.Ticker = mockTicker{}
func newFixture(t *testing.T) testFixture {
checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
batcher := &testBatcher{
t: t,

View File

@@ -69,7 +69,7 @@ func TestInputRangeTestCounter(t *testing.T) {
t: t,
agg: cagg,
}
sdk := sdk.New(batcher, sdk.DefaultLabelEncoder())
sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder())
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@@ -104,7 +104,7 @@ func TestInputRangeTestMeasure(t *testing.T) {
t: t,
agg: magg,
}
sdk := sdk.New(batcher, sdk.DefaultLabelEncoder())
sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder())
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@@ -139,7 +139,7 @@ func TestDisabledInstrument(t *testing.T) {
t: t,
agg: nil,
}
sdk := sdk.New(batcher, sdk.DefaultLabelEncoder())
sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder())
measure := sdk.NewFloat64Measure("measure.name", metric.WithAbsolute(true))
measure.Record(ctx, -1, sdk.Labels())
@@ -154,7 +154,7 @@ func TestRecordNaN(t *testing.T) {
t: t,
agg: gauge.New(),
}
sdk := sdk.New(batcher, sdk.DefaultLabelEncoder())
sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder())
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@@ -188,7 +188,7 @@ func TestSDKLabelEncoder(t *testing.T) {
}
func TestDefaultLabelEncoder(t *testing.T) {
encoder := sdk.DefaultLabelEncoder()
encoder := sdk.NewDefaultLabelEncoder()
encoded := encoder.Encode([]core.KeyValue{key.String("A", "B"), key.String("C", "D")})
require.Equal(t, `A=B,C=D`, encoded)
}

View File

@@ -37,7 +37,7 @@ func ExampleNew() {
if err != nil {
panic(fmt.Sprintln("Could not initialize stdout exporter:", err))
}
batcher := defaultkeys.New(selector, sdk.DefaultLabelEncoder(), true)
batcher := defaultkeys.New(selector, sdk.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()
defer pusher.Stop()

View File

@@ -35,7 +35,7 @@ type defaultLabelEncoder struct {
var _ export.LabelEncoder = &defaultLabelEncoder{}
func DefaultLabelEncoder() export.LabelEncoder {
func NewDefaultLabelEncoder() export.LabelEncoder {
return &defaultLabelEncoder{
pool: sync.Pool{
New: func() interface{} {

View File

@@ -70,7 +70,7 @@ func TestMonotoneGauge(t *testing.T) {
batcher := &monotoneBatcher{
t: t,
}
sdk := sdk.New(batcher, sdk.DefaultLabelEncoder())
sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder())
sdk.SetErrorHandler(func(error) { t.Fatal("Unexpected") })

View File

@@ -290,7 +290,7 @@ func stressTest(t *testing.T, impl testImpl) {
lused: map[string]bool{},
}
cc := concurrency()
sdk := sdk.New(fixture, sdk.DefaultLabelEncoder())
sdk := sdk.New(fixture, sdk.NewDefaultLabelEncoder())
fixture.wg.Add(cc + 1)
for i := 0; i < cc; i++ {