1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-07 23:02:15 +02:00
opentelemetry-go/exporters/metric/internal/statsd/conn.go
Tyler Yahn 52fb033e13
Rename the exporter directory to exporters (#502)
The `go.opentelemetry.io/otel/exporter/trace/jaeger` package was
mistakenly released with a `v1.0.0` tag instead of `v0.1.0`. This
resulted in all subsequent releases not becoming the default latest,
meaning that `go get`s pulled in the incompatible `v0.1.0` release of
that package when pulling in more recent packages from other otel
packages. Renaming the `exporter` directory to `exporters` fixes this
issue by consequentially renaming the package.

Additionally, this action also renames *all* exporters. This is
understood to be a disruptive action to existing users as they will need
to update any dependencies they currently have on our exporters.
However, it was decided to take this action regardless. The need to
resolve the existing issue explained above is highly important, and
given the Alpha state of this project these kinds of breaking changes
should be expected (though not without reason).

Resolves #331

Co-authored-by: Rahul Patel <rghetia@yahoo.com>
2020-03-02 13:54:57 -08:00

289 lines
7.1 KiB
Go

// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd
// See https://github.com/b/statsd_spec for the best-available statsd
// syntax specification. See also
// https://github.com/statsd/statsd/edit/master/docs/metric_types.md
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"strconv"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/unit"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
type (
// Config supports common configuration that applies to statsd exporters.
Config struct {
// URL describes the destination for exporting statsd data.
// e.g., udp://host:port
// tcp://host:port
// unix:///socket/path
URL string
// Writer is an alternate to providing a URL. When Writer is
// non-nil, URL will be ignored and the exporter will write to
// the configured Writer interface.
Writer io.Writer
// MaxPacketSize this limits the packet size for packet-oriented transports.
MaxPacketSize int
// TODO support Dial and Write timeouts
}
// Exporter is common type meant to implement concrete statsd
// exporters.
Exporter struct {
adapter Adapter
config Config
conn net.Conn
writer io.Writer
buffer bytes.Buffer
}
// Adapter supports statsd syntax variations, primarily plain
// statsd vs. dogstatsd.
Adapter interface {
AppendName(export.Record, *bytes.Buffer)
AppendTags(export.Record, *bytes.Buffer)
}
)
const (
formatCounter = "c"
formatHistogram = "h"
formatGauge = "g"
formatTiming = "ms"
MaxPacketSize = 1 << 16
)
var (
_ export.Exporter = &Exporter{}
ErrInvalidScheme = fmt.Errorf("invalid statsd transport")
)
// NewExport returns a common implementation for exporters that Export
// statsd syntax.
func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
if config.MaxPacketSize <= 0 {
config.MaxPacketSize = MaxPacketSize
}
var writer io.Writer
var conn net.Conn
var err error
if config.Writer != nil {
writer = config.Writer
} else {
conn, err = dial(config.URL)
if conn != nil {
writer = conn
}
}
// TODO: If err != nil, we return it _with_ a valid exporter; the
// exporter should attempt to re-dial if it's retryable. Add a
// Start() and Stop() API.
return &Exporter{
adapter: adapter,
config: config,
conn: conn,
writer: writer,
}, err
}
// dial connects to a statsd service using several common network
// types. Presently "udp" and "unix" datagram socket connections are
// supported.
func dial(endpoint string) (net.Conn, error) {
dest, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
// TODO: Support tcp destination, need configurable timeouts first.
scheme := dest.Scheme
switch scheme {
case "udp", "udp4", "udp6":
udpAddr, err := net.ResolveUDPAddr(scheme, dest.Host)
locAddr := &net.UDPAddr{}
if err != nil {
return nil, err
}
conn, err := net.DialUDP(scheme, locAddr, udpAddr)
if err != nil {
return nil, err
}
return conn, err
case "unix", "unixgram":
scheme = "unixgram"
locAddr := &net.UnixAddr{}
sockAddr, err := net.ResolveUnixAddr(scheme, dest.Path)
if err != nil {
return nil, err
}
conn, err := net.DialUnix(scheme, locAddr, sockAddr)
if err != nil {
return nil, err
}
return conn, err
}
return nil, ErrInvalidScheme
}
// Export is common code for any statsd-based metric.Exporter implementation.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
buf := &e.buffer
buf.Reset()
var aggErr error
var sendErr error
checkpointSet.ForEach(func(rec export.Record) {
before := buf.Len()
if err := e.formatMetric(rec, buf); err != nil && aggErr == nil {
aggErr = err
return
}
if buf.Len() < e.config.MaxPacketSize {
return
}
if before == 0 {
// A single metric >= packet size
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
buf.Reset()
return
}
// Send and copy the leftover
if err := e.send(buf.Bytes()[:before]); err != nil && sendErr == nil {
sendErr = err
}
leftover := buf.Len() - before
copy(buf.Bytes()[0:leftover], buf.Bytes()[before:])
buf.Truncate(leftover)
})
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
if sendErr != nil {
return sendErr
}
return aggErr
}
// send writes a complete buffer to the writer as a blocking call.
func (e *Exporter) send(buf []byte) error {
for len(buf) != 0 {
n, err := e.writer.Write(buf)
if err != nil {
return err
}
buf = buf[n:]
}
return nil
}
// formatMetric formats an individual export record. For some records
// this will emit a single statistic, for some it will emit more than
// one.
func (e *Exporter) formatMetric(rec export.Record, buf *bytes.Buffer) error {
desc := rec.Descriptor()
agg := rec.Aggregator()
// TODO handle non-Points Distribution/MaxSumCount by
// formatting individual quantiles, the sum, and the count as
// single statistics. For the dogstatsd variation, assuming
// open-source systems like Veneur add support, figure out the
// proper encoding for "d"-type distribution data.
if pts, ok := agg.(aggregator.Points); ok {
var format string
if desc.Unit() == unit.Milliseconds {
format = formatTiming
} else {
format = formatHistogram
}
points, err := pts.Points()
if err != nil {
return err
}
for _, pt := range points {
e.formatSingleStat(rec, pt, format, buf)
}
} else if sum, ok := agg.(aggregator.Sum); ok {
sum, err := sum.Sum()
if err != nil {
return err
}
e.formatSingleStat(rec, sum, formatCounter, buf)
} else if lv, ok := agg.(aggregator.LastValue); ok {
lv, _, err := lv.LastValue()
if err != nil {
return err
}
e.formatSingleStat(rec, lv, formatGauge, buf)
}
return nil
}
// formatSingleStat encodes a single item of statsd data followed by a
// newline.
func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) {
e.adapter.AppendName(rec, buf)
_, _ = buf.WriteRune(':')
writeNumber(buf, val, rec.Descriptor().NumberKind())
_, _ = buf.WriteRune('|')
_, _ = buf.WriteString(fmtStr)
e.adapter.AppendTags(rec, buf)
_, _ = buf.WriteRune('\n')
}
func writeNumber(buf *bytes.Buffer, num core.Number, kind core.NumberKind) {
var tmp [128]byte
var conv []byte
switch kind {
case core.Int64NumberKind:
conv = strconv.AppendInt(tmp[:0], num.AsInt64(), 10)
case core.Float64NumberKind:
conv = strconv.AppendFloat(tmp[:0], num.AsFloat64(), 'g', -1, 64)
case core.Uint64NumberKind:
conv = strconv.AppendUint(tmp[:0], num.AsUint64(), 10)
}
_, _ = buf.Write(conv)
}