1
0
mirror of https://github.com/go-kit/kit.git synced 2025-07-17 01:12:38 +02:00
Files
go-kit/metrics/cloudwatch2/cloudwatch2.go
Chris Hines 801da84a68 Replace kit/log with log (#1173)
* Implement log/... packages with github.com/go-kit/log

* Use github.com/go-kit/log/... in all the other packages
2021-08-18 17:15:32 +02:00

252 lines
6.8 KiB
Go

// Package cloudwatch2 emits all data as a StatisticsSet (rather than
// a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK.
package cloudwatch2
import (
"context"
"math"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"golang.org/x/sync/errgroup"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/internal/convert"
"github.com/go-kit/kit/metrics/internal/lv"
"github.com/go-kit/log"
)
const (
maxConcurrentRequests = 20
)
// CloudWatchAPI is an interface that defines the set of Amazon CloudWatch API operations required by CloudWatch.
type CloudWatchAPI interface {
PutMetricData(ctx context.Context, params *cloudwatch.PutMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.PutMetricDataOutput, error)
}
// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
mtx sync.RWMutex
sem chan struct{}
namespace string
svc CloudWatchAPI
counters *lv.Space
logger log.Logger
numConcurrentRequests int
}
// Option is a function adapter to change config of the CloudWatch struct
type Option func(*CloudWatch)
// WithLogger sets the Logger that will receive error messages generated
// during the WriteLoop. By default, no logger is used.
func WithLogger(logger log.Logger) Option {
return func(cw *CloudWatch) {
cw.logger = logger
}
}
// WithConcurrentRequests sets the upper limit on how many
// cloudwatch.PutMetricDataRequest may be under way at any
// given time. If n is greater than 20, 20 is used. By default,
// the max is set at 10 concurrent requests.
func WithConcurrentRequests(n int) Option {
return func(cw *CloudWatch) {
if n > maxConcurrentRequests {
n = maxConcurrentRequests
}
cw.numConcurrentRequests = n
}
}
// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc CloudWatchAPI, options ...Option) *CloudWatch {
cw := &CloudWatch{
namespace: namespace,
svc: svc,
counters: lv.NewSpace(),
numConcurrentRequests: 10,
logger: log.NewNopLogger(),
}
for _, optFunc := range options {
optFunc(cw)
}
cw.sem = make(chan struct{}, cw.numConcurrentRequests)
return cw
}
// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
return &Counter{
name: name,
obs: cw.counters.Observe,
}
}
// NewGauge returns an gauge. Under the covers, there is no distinctions
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this
// just wraps a cloudwatch2.Counter.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
return convert.NewCounterAsGauge(cw.NewCounter(name))
}
// NewHistogram returns a histogram. Under the covers, there is no distinctions
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this
// just wraps a cloudwatch2.Counter.
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
return convert.NewCounterAsHistogram(cw.NewCounter(name))
}
// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until ctx is canceled, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
for {
select {
case <-c:
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
case <-ctx.Done():
return
}
}
}
// Send will fire an API request to CloudWatch with the latest stats for
// all metrics. It is preferred that the WriteLoop method is used.
func (cw *CloudWatch) Send() error {
cw.mtx.RLock()
defer cw.mtx.RUnlock()
now := time.Now()
var datums []types.MetricDatum
cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
datums = append(datums, types.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(lvs...),
StatisticValues: stats(values),
Timestamp: aws.Time(now),
})
return true
})
var batches [][]types.MetricDatum
for len(datums) > 0 {
var batch []types.MetricDatum
lim := len(datums)
if lim > maxConcurrentRequests {
lim = maxConcurrentRequests
}
batch, datums = datums[:lim], datums[lim:]
batches = append(batches, batch)
}
var g errgroup.Group
for _, batch := range batches {
batch := batch
g.Go(func() error {
cw.sem <- struct{}{}
defer func() {
<-cw.sem
}()
_, err := cw.svc.PutMetricData(context.TODO(), &cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.namespace),
MetricData: batch,
})
return err
})
}
return g.Wait()
}
var zero = float64(0.0)
// Just build this once to reduce construction costs whenever
// someone does a Send with no aggregated values.
var zeros = types.StatisticSet{
Maximum: &zero,
Minimum: &zero,
Sum: &zero,
SampleCount: &zero,
}
func stats(a []float64) *types.StatisticSet {
count := float64(len(a))
if count == 0 {
return &zeros
}
var sum float64
var min = math.MaxFloat64
var max = math.MaxFloat64 * -1
for _, f := range a {
sum += f
if f < min {
min = f
}
if f > max {
max = f
}
}
return &types.StatisticSet{
Maximum: &max,
Minimum: &min,
Sum: &sum,
SampleCount: &count,
}
}
func makeDimensions(labelValues ...string) []types.Dimension {
dimensions := make([]types.Dimension, len(labelValues)/2)
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
dimensions[j] = types.Dimension{
Name: aws.String(labelValues[i]),
Value: aws.String(labelValues[i+1]),
}
}
return dimensions
}
type observeFunc func(name string, lvs lv.LabelValues, value float64)
// Counter is a counter. Observations are forwarded to a node
// object, and aggregated per timeseries.
type Counter struct {
name string
lvs lv.LabelValues
obs observeFunc
}
// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
name: c.name,
lvs: c.lvs.With(labelValues...),
obs: c.obs,
}
}
// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.obs(c.name, c.lvs, delta)
}