mirror of
https://github.com/go-kit/kit.git
synced 2025-07-17 01:12:38 +02:00
* Implement log/... packages with github.com/go-kit/log * Use github.com/go-kit/log/... in all the other packages
252 lines
6.8 KiB
Go
252 lines
6.8 KiB
Go
// Package cloudwatch2 emits all data as a StatisticsSet (rather than
|
|
// a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK.
|
|
package cloudwatch2
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
|
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/go-kit/kit/metrics"
|
|
"github.com/go-kit/kit/metrics/internal/convert"
|
|
"github.com/go-kit/kit/metrics/internal/lv"
|
|
"github.com/go-kit/log"
|
|
)
|
|
|
|
const (
|
|
maxConcurrentRequests = 20
|
|
)
|
|
|
|
// CloudWatchAPI is an interface that defines the set of Amazon CloudWatch API operations required by CloudWatch.
|
|
type CloudWatchAPI interface {
|
|
PutMetricData(ctx context.Context, params *cloudwatch.PutMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.PutMetricDataOutput, error)
|
|
}
|
|
|
|
// CloudWatch receives metrics observations and forwards them to CloudWatch.
|
|
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
|
|
// dependencies to the components that will use them.
|
|
//
|
|
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
|
|
type CloudWatch struct {
|
|
mtx sync.RWMutex
|
|
sem chan struct{}
|
|
namespace string
|
|
svc CloudWatchAPI
|
|
counters *lv.Space
|
|
logger log.Logger
|
|
numConcurrentRequests int
|
|
}
|
|
|
|
// Option is a function adapter to change config of the CloudWatch struct
|
|
type Option func(*CloudWatch)
|
|
|
|
// WithLogger sets the Logger that will receive error messages generated
|
|
// during the WriteLoop. By default, no logger is used.
|
|
func WithLogger(logger log.Logger) Option {
|
|
return func(cw *CloudWatch) {
|
|
cw.logger = logger
|
|
}
|
|
}
|
|
|
|
// WithConcurrentRequests sets the upper limit on how many
|
|
// cloudwatch.PutMetricDataRequest may be under way at any
|
|
// given time. If n is greater than 20, 20 is used. By default,
|
|
// the max is set at 10 concurrent requests.
|
|
func WithConcurrentRequests(n int) Option {
|
|
return func(cw *CloudWatch) {
|
|
if n > maxConcurrentRequests {
|
|
n = maxConcurrentRequests
|
|
}
|
|
cw.numConcurrentRequests = n
|
|
}
|
|
}
|
|
|
|
// New returns a CloudWatch object that may be used to create metrics.
|
|
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
|
|
// Callers must ensure that regular calls to Send are performed, either
|
|
// manually or with one of the helper methods.
|
|
func New(namespace string, svc CloudWatchAPI, options ...Option) *CloudWatch {
|
|
cw := &CloudWatch{
|
|
namespace: namespace,
|
|
svc: svc,
|
|
counters: lv.NewSpace(),
|
|
numConcurrentRequests: 10,
|
|
logger: log.NewNopLogger(),
|
|
}
|
|
|
|
for _, optFunc := range options {
|
|
optFunc(cw)
|
|
}
|
|
|
|
cw.sem = make(chan struct{}, cw.numConcurrentRequests)
|
|
|
|
return cw
|
|
}
|
|
|
|
// NewCounter returns a counter. Observations are aggregated and emitted once
|
|
// per write invocation.
|
|
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
|
|
return &Counter{
|
|
name: name,
|
|
obs: cw.counters.Observe,
|
|
}
|
|
}
|
|
|
|
// NewGauge returns an gauge. Under the covers, there is no distinctions
|
|
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this
|
|
// just wraps a cloudwatch2.Counter.
|
|
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
|
|
return convert.NewCounterAsGauge(cw.NewCounter(name))
|
|
}
|
|
|
|
// NewHistogram returns a histogram. Under the covers, there is no distinctions
|
|
// in CloudWatch for how Counters/Histograms/Gauges are reported, so this
|
|
// just wraps a cloudwatch2.Counter.
|
|
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
|
|
return convert.NewCounterAsHistogram(cw.NewCounter(name))
|
|
}
|
|
|
|
// WriteLoop is a helper method that invokes Send every time the passed
|
|
// channel fires. This method blocks until ctx is canceled, so clients
|
|
// probably want to run it in its own goroutine. For typical usage, create a
|
|
// time.Ticker and pass its C channel to this method.
|
|
func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
|
|
for {
|
|
select {
|
|
case <-c:
|
|
if err := cw.Send(); err != nil {
|
|
cw.logger.Log("during", "Send", "err", err)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send will fire an API request to CloudWatch with the latest stats for
|
|
// all metrics. It is preferred that the WriteLoop method is used.
|
|
func (cw *CloudWatch) Send() error {
|
|
cw.mtx.RLock()
|
|
defer cw.mtx.RUnlock()
|
|
now := time.Now()
|
|
|
|
var datums []types.MetricDatum
|
|
|
|
cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
|
|
datums = append(datums, types.MetricDatum{
|
|
MetricName: aws.String(name),
|
|
Dimensions: makeDimensions(lvs...),
|
|
StatisticValues: stats(values),
|
|
Timestamp: aws.Time(now),
|
|
})
|
|
return true
|
|
})
|
|
|
|
var batches [][]types.MetricDatum
|
|
for len(datums) > 0 {
|
|
var batch []types.MetricDatum
|
|
lim := len(datums)
|
|
if lim > maxConcurrentRequests {
|
|
lim = maxConcurrentRequests
|
|
}
|
|
batch, datums = datums[:lim], datums[lim:]
|
|
batches = append(batches, batch)
|
|
}
|
|
|
|
var g errgroup.Group
|
|
for _, batch := range batches {
|
|
batch := batch
|
|
g.Go(func() error {
|
|
cw.sem <- struct{}{}
|
|
defer func() {
|
|
<-cw.sem
|
|
}()
|
|
_, err := cw.svc.PutMetricData(context.TODO(), &cloudwatch.PutMetricDataInput{
|
|
Namespace: aws.String(cw.namespace),
|
|
MetricData: batch,
|
|
})
|
|
return err
|
|
})
|
|
}
|
|
return g.Wait()
|
|
}
|
|
|
|
var zero = float64(0.0)
|
|
|
|
// Just build this once to reduce construction costs whenever
|
|
// someone does a Send with no aggregated values.
|
|
var zeros = types.StatisticSet{
|
|
Maximum: &zero,
|
|
Minimum: &zero,
|
|
Sum: &zero,
|
|
SampleCount: &zero,
|
|
}
|
|
|
|
func stats(a []float64) *types.StatisticSet {
|
|
count := float64(len(a))
|
|
if count == 0 {
|
|
return &zeros
|
|
}
|
|
|
|
var sum float64
|
|
var min = math.MaxFloat64
|
|
var max = math.MaxFloat64 * -1
|
|
for _, f := range a {
|
|
sum += f
|
|
if f < min {
|
|
min = f
|
|
}
|
|
if f > max {
|
|
max = f
|
|
}
|
|
}
|
|
|
|
return &types.StatisticSet{
|
|
Maximum: &max,
|
|
Minimum: &min,
|
|
Sum: &sum,
|
|
SampleCount: &count,
|
|
}
|
|
}
|
|
|
|
func makeDimensions(labelValues ...string) []types.Dimension {
|
|
dimensions := make([]types.Dimension, len(labelValues)/2)
|
|
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
|
|
dimensions[j] = types.Dimension{
|
|
Name: aws.String(labelValues[i]),
|
|
Value: aws.String(labelValues[i+1]),
|
|
}
|
|
}
|
|
return dimensions
|
|
}
|
|
|
|
type observeFunc func(name string, lvs lv.LabelValues, value float64)
|
|
|
|
// Counter is a counter. Observations are forwarded to a node
|
|
// object, and aggregated per timeseries.
|
|
type Counter struct {
|
|
name string
|
|
lvs lv.LabelValues
|
|
obs observeFunc
|
|
}
|
|
|
|
// With implements metrics.Counter.
|
|
func (c *Counter) With(labelValues ...string) metrics.Counter {
|
|
return &Counter{
|
|
name: c.name,
|
|
lvs: c.lvs.With(labelValues...),
|
|
obs: c.obs,
|
|
}
|
|
}
|
|
|
|
// Add implements metrics.Counter.
|
|
func (c *Counter) Add(delta float64) {
|
|
c.obs(c.name, c.lvs, delta)
|
|
}
|