1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-20 19:52:56 +02:00
opentelemetry-go/exporter/metric/internal/statsd/conn.go

289 lines
7.1 KiB
Go
Raw Normal View History

Dogstatsd metrics exporter (#326) * Add MetricAggregator.Merge() implementations * Update from feedback * Type * Ckpt * Ckpt * Add push controller * Ckpt * Add aggregator interfaces, stdout encoder * Modify basic main.go * Main is working * Batch stdout output * Sum udpate * Rename stdout * Add stateless/stateful Batcher options * Undo a for-loop in the example, remove a done TODO * Update imports * Add note * Rename defaultkeys * Support variable label encoder to speed OpenMetrics/Statsd export * Lint * Checkpoint * Checkpoint * Doc * Precommit/lint * Simplify Aggregator API * Record->Identifier * Remove export.Record a.k.a. Identifier * Checkpoint * Propagate errors to the SDK, remove a bunch of 'TODO warn' * Checkpoint * Introduce export.Labels * Comments in export/metric.go * Comment * More merge * More doc * Complete example * Lint fixes * Add a testable example * Lint * Dogstats * Let Export return an error * Checkpoint * add a basic stdout exporter test * Add measure test; fix aggregator APIs * Use JSON numbers, not strings * Test stdout exporter error * Add a test for the call to RangeTest * Add error handler API to improve correctness test; return errors from RecordOne * Undo the previous -- do not expose errors * Add simple selector variations, test * Repair examples * Test push controller error handling * Add SDK label encoder tests * Add a defaultkeys batcher test * Add an ungrouped batcher test * Lint new tests * Respond to krnowak's feedback * Checkpoint * Funciontal example using unixgram * Tidy the example * Add a packet-split test * More tests * Undo comment * Use concrete receivers for export records and labels, since the constructors return structs not pointers * Bug fix for stateful batchers; clone an aggregator for long term storage * Remove TODO addressed in #318 * Add errors to all aggregator interfaces * Handle ErrNoLastValue case in stdout exporter * Move aggregator API into sdk/export/metric/aggregator * Update all aggregator exported-method comments * Document the aggregator APIs * More aggregator comments * Add multiple updates to the ungrouped test * Fixes for feedback from Gustavo and Liz * Producer->CheckpointSet; add FinishedCollection * Process takes an export.Record * ReadCheckpoint->CheckpointSet * EncodeLabels->Encode * Format a better inconsistent type error; add more aggregator API tests * More RangeTest test coverage * Make benbjohnson/clock a test-only dependency * Handle ErrNoLastValue in stress_test * Update comments; use a pipe vs a unix socket in the example test * Update test * Spelling * Typo fix * Rename DefaultLabelEncoder to NewDefaultLabelEncoder for clarity * Rename DefaultLabelEncoder to NewDefaultLabelEncoder for clarity * Test different adapters; add ForceEncode to statsd label encoder
2019-11-22 06:46:05 +02:00
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd
// See https://github.com/b/statsd_spec for the best-available statsd
// syntax specification. See also
// https://github.com/statsd/statsd/edit/master/docs/metric_types.md
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"strconv"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/unit"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
type (
// Config supports common options that apply to statsd exporters.
Config struct {
// URL describes the destination for exporting statsd data.
// e.g., udp://host:port
// tcp://host:port
// unix:///socket/path
URL string
// Writer is an alternate to providing a URL. When Writer is
// non-nil, URL will be ignored and the exporter will write to
// the configured Writer interface.
Writer io.Writer
// MaxPacketSize this limits the packet size for packet-oriented transports.
MaxPacketSize int
// TODO support Dial and Write timeouts
}
// Exporter is common type meant to implement concrete statsd
// exporters.
Exporter struct {
adapter Adapter
config Config
conn net.Conn
writer io.Writer
buffer bytes.Buffer
}
// Adapter supports statsd syntax variations, primarily plain
// statsd vs. dogstatsd.
Adapter interface {
AppendName(export.Record, *bytes.Buffer)
AppendTags(export.Record, *bytes.Buffer)
}
)
const (
formatCounter = "c"
formatHistogram = "h"
formatGauge = "g"
formatTiming = "ms"
MaxPacketSize = 1 << 16
)
var (
_ export.Exporter = &Exporter{}
ErrInvalidScheme = fmt.Errorf("Invalid statsd transport")
)
// NewExport returns a common implementation for exporters that Export
// statsd syntax.
func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
if config.MaxPacketSize <= 0 {
config.MaxPacketSize = MaxPacketSize
}
var writer io.Writer
var conn net.Conn
var err error
if config.Writer != nil {
writer = config.Writer
} else {
conn, err = dial(config.URL)
if conn != nil {
writer = conn
}
}
// TODO: If err != nil, we return it _with_ a valid exporter; the
// exporter should attempt to re-dial if it's retryable. Add a
// Start() and Stop() API.
return &Exporter{
adapter: adapter,
config: config,
conn: conn,
writer: writer,
}, err
}
// dial connects to a statsd service using several common network
// types. Presently "udp" and "unix" datagram socket connections are
// supported.
func dial(endpoint string) (net.Conn, error) {
dest, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
// TODO: Support tcp destination, need configurable timeouts first.
scheme := dest.Scheme
switch scheme {
case "udp", "udp4", "udp6":
udpAddr, err := net.ResolveUDPAddr(scheme, dest.Host)
locAddr := &net.UDPAddr{}
if err != nil {
return nil, err
}
conn, err := net.DialUDP(scheme, locAddr, udpAddr)
if err != nil {
return nil, err
}
return conn, err
case "unix", "unixgram":
scheme = "unixgram"
locAddr := &net.UnixAddr{}
sockAddr, err := net.ResolveUnixAddr(scheme, dest.Path)
if err != nil {
return nil, err
}
conn, err := net.DialUnix(scheme, locAddr, sockAddr)
if err != nil {
return nil, err
}
return conn, err
}
return nil, ErrInvalidScheme
}
// Export is common code for any statsd-based metric.Exporter implementation.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
buf := &e.buffer
buf.Reset()
var aggErr error
var sendErr error
checkpointSet.ForEach(func(rec export.Record) {
before := buf.Len()
if err := e.formatMetric(rec, buf); err != nil && aggErr == nil {
aggErr = err
return
}
if buf.Len() < e.config.MaxPacketSize {
return
}
if before == 0 {
// A single metric >= packet size
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
buf.Reset()
return
}
// Send and copy the leftover
if err := e.send(buf.Bytes()[:before]); err != nil && sendErr == nil {
sendErr = err
}
leftover := buf.Len() - before
copy(buf.Bytes()[0:leftover], buf.Bytes()[before:])
buf.Truncate(leftover)
})
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
if sendErr != nil {
return sendErr
}
return aggErr
}
// send writes a complete buffer to the writer as a blocking call.
func (e *Exporter) send(buf []byte) error {
for len(buf) != 0 {
n, err := e.writer.Write(buf)
if err != nil {
return err
}
buf = buf[n:]
}
return nil
}
// formatMetric formats an individual export record. For some records
// this will emit a single statistic, for some it will emit more than
// one.
func (e *Exporter) formatMetric(rec export.Record, buf *bytes.Buffer) error {
desc := rec.Descriptor()
agg := rec.Aggregator()
// TODO handle non-Points Distribution/MaxSumCount by
// formatting individual quantiles, the sum, and the count as
// single statistics. For the dogstatsd variation, assuming
// open-source systems like Veneur add support, figure out the
// proper encoding for "d"-type distribution data.
if pts, ok := agg.(aggregator.Points); ok {
var format string
if desc.Unit() == unit.Milliseconds {
format = formatTiming
} else {
format = formatHistogram
}
points, err := pts.Points()
if err != nil {
return err
}
for _, pt := range points {
e.formatSingleStat(rec, pt, format, buf)
}
} else if sum, ok := agg.(aggregator.Sum); ok {
sum, err := sum.Sum()
if err != nil {
return err
}
e.formatSingleStat(rec, sum, formatCounter, buf)
} else if lv, ok := agg.(aggregator.LastValue); ok {
lv, _, err := lv.LastValue()
if err != nil {
return err
}
e.formatSingleStat(rec, lv, formatGauge, buf)
}
return nil
}
// formatSingleStat encodes a single item of statsd data followed by a
// newline.
func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) {
e.adapter.AppendName(rec, buf)
_, _ = buf.WriteRune(':')
writeNumber(buf, val, rec.Descriptor().NumberKind())
_, _ = buf.WriteRune('|')
_, _ = buf.WriteString(fmtStr)
e.adapter.AppendTags(rec, buf)
_, _ = buf.WriteRune('\n')
}
func writeNumber(buf *bytes.Buffer, num core.Number, kind core.NumberKind) {
var tmp [128]byte
var conv []byte
switch kind {
case core.Int64NumberKind:
conv = strconv.AppendInt(tmp[:0], num.AsInt64(), 10)
case core.Float64NumberKind:
conv = strconv.AppendFloat(tmp[:0], num.AsFloat64(), 'g', -1, 64)
case core.Uint64NumberKind:
conv = strconv.AppendUint(tmp[:0], num.AsUint64(), 10)
}
_, _ = buf.Write(conv)
}